SqliteGraphRepository

SqliteGraphRepository is the concrete GraphRepository implementation that stores graph data in a local SQLite file via SQLAlchemy 2.0.

Initialisation

from knowledge_platform.persistence.store import SqliteGraphRepository
from pathlib import Path

# Production: file on disk
repo = SqliteGraphRepository(Path("/data/my_workspace.db"))

# Testing: in-memory (disposed when repo is garbage collected)
repo = SqliteGraphRepository(":memory:")

On initialisation, Base.metadata.create_all() is called so all four tables are created if they do not already exist. This makes the repository safe to instantiate against a fresh file.

Workspace Helper

ensure_workspace() is a convenience method (not part of GraphRepository) that inserts the workspace row if it does not already exist. WorkspaceService calls this automatically:

repo.ensure_workspace(workspace_id, name="My Project")

Upsert Semantics

All save_* methods follow an upsert pattern: fetch by primary key → insert if absent, update if present. This means they are safe to call repeatedly with the same entity.

Connection Pool Cleanup

repo.close()  # disposes the SQLAlchemy engine's connection pool

This should always be called when the repository is no longer needed. WorkspaceService.close() does this automatically for all workspaces it manages.

API Reference

knowledge_platform.persistence.store.SqliteGraphRepository

Bases: GraphRepository

Persists graphs, nodes, and edges in a local SQLite database.

Parameters:

Name Type Description Default
db_path str | Path

Path to the SQLite file. Pass ":memory:" for an in-process database (useful in tests).

':memory:'
Source code in src/knowledge_platform/persistence/store.py
class SqliteGraphRepository(GraphRepository):
    """Persists graphs, nodes, and edges in a local SQLite database.

    Args:
        db_path: Path to the SQLite file.  Pass ``":memory:"`` for an
            in-process database (useful in tests).
    """

    def __init__(self, db_path: str | Path = ":memory:") -> None:
        self._db_path = Path(db_path) if str(db_path) != ":memory:" else Path(":memory:")
        url = f"sqlite:///{db_path}" if str(db_path) != ":memory:" else "sqlite:///:memory:"
        self._engine = create_engine(url, echo=False, future=True)
        Base.metadata.create_all(self._engine)
        logger.info("persistence.init", db_path=str(db_path))

    def close(self) -> None:
        """Dispose the underlying SQLAlchemy connection pool.

        Call this when the repository is no longer needed (e.g. at the end of
        a test or when the workspace is closed) to avoid ``ResourceWarning``
        about unclosed database connections.
        """
        self._engine.dispose()

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _session(self) -> Session:
        return Session(self._engine)

    @staticmethod
    def _node_from_row(row: NodeRow) -> Node:
        return Node(
            id=NodeId(row.id),
            graph_id=GraphId(row.graph_id),
            type_name=row.type_name,
            attributes=json.loads(row.attributes_json),
            version=row.version,
            created_at=row.created_at.replace(tzinfo=timezone.utc),
            updated_at=row.updated_at.replace(tzinfo=timezone.utc),
        )

    @staticmethod
    def _edge_from_row(row: EdgeRow) -> Edge:
        return Edge(
            id=EdgeId(row.id),
            graph_id=GraphId(row.graph_id),
            source_id=NodeId(row.source_id),
            target_id=NodeId(row.target_id),
            type_name=row.type_name,
            attributes=json.loads(row.attributes_json),
            version=row.version,
            created_at=row.created_at.replace(tzinfo=timezone.utc),
            updated_at=row.updated_at.replace(tzinfo=timezone.utc),
        )

    @staticmethod
    def _graph_from_row(row: GraphRow, nodes: list[NodeRow], edges: list[EdgeRow]) -> Graph:
        graph = Graph(
            id=GraphId(row.id),
            workspace_id=WorkspaceId(row.workspace_id),
            type_name=row.type_name,
            name=row.name,
            version=row.version,
            created_at=row.created_at.replace(tzinfo=timezone.utc),
            updated_at=row.updated_at.replace(tzinfo=timezone.utc),
        )
        for n in nodes:
            graph._nodes[NodeId(n.id)] = SqliteGraphRepository._node_from_row(n)
        for e in edges:
            graph._edges[EdgeId(e.id)] = SqliteGraphRepository._edge_from_row(e)
        return graph

    # ------------------------------------------------------------------
    # Workspace helpers (convenience, not part of GraphRepository interface)
    # ------------------------------------------------------------------

    def ensure_workspace(self, workspace_id: WorkspaceId, name: str = "") -> None:
        """Create the workspace row if it does not exist.

        Args:
            workspace_id: Workspace identifier.
            name: Display name.
        """
        with self._session() as session:
            existing = session.get(WorkspaceRow, str(workspace_id))
            if existing is None:
                now = datetime.now(timezone.utc)
                session.add(
                    WorkspaceRow(
                        id=str(workspace_id),
                        name=name,
                        created_at=now,
                        updated_at=now,
                    )
                )
                session.commit()

    def load_workspace(self) -> Workspace | None:
        """Load the workspace record stored in this database, if any."""
        with self._session() as session:
            row = session.execute(select(WorkspaceRow).limit(1)).scalar_one_or_none()
            if row is None:
                return None
            return Workspace(
                id=WorkspaceId(row.id),
                name=row.name,
                db_path=self._db_path,
                created_at=row.created_at.replace(tzinfo=timezone.utc),
                updated_at=row.updated_at.replace(tzinfo=timezone.utc),
            )

    # ------------------------------------------------------------------
    # GraphRepository implementation
    # ------------------------------------------------------------------

    def save_graph(self, graph: Graph) -> None:
        """Upsert a graph row (without touching nodes/edges).

        Args:
            graph: Graph to persist.
        """
        with self._session() as session:
            row = session.get(GraphRow, str(graph.id))
            now = graph.updated_at.replace(tzinfo=None)
            if row is None:
                session.add(
                    GraphRow(
                        id=str(graph.id),
                        workspace_id=str(graph.workspace_id),
                        type_name=graph.type_name,
                        name=graph.name,
                        version=graph.version,
                        created_at=graph.created_at.replace(tzinfo=None),
                        updated_at=now,
                    )
                )
            else:
                row.name = graph.name
                row.version = graph.version
                row.updated_at = now
            session.commit()

    def load_graph(self, graph_id: GraphId) -> Graph | None:
        """Load a graph and all its nodes/edges from the database.

        Args:
            graph_id: Target identifier.

        Returns:
            Reconstructed :class:`Graph` or ``None``.
        """
        with self._session() as session:
            row = session.get(GraphRow, str(graph_id))
            if row is None:
                return None
            nodes = list(session.execute(select(NodeRow).where(NodeRow.graph_id == str(graph_id))).scalars())
            edges = list(session.execute(select(EdgeRow).where(EdgeRow.graph_id == str(graph_id))).scalars())
            return self._graph_from_row(row, nodes, edges)

    def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
        """Return all graphs in the given workspace (without nodes/edges).

        Args:
            workspace_id: Owning workspace.

        Returns:
            List of :class:`Graph` instances.
        """
        with self._session() as session:
            rows = list(
                session.execute(
                    select(GraphRow).where(GraphRow.workspace_id == str(workspace_id))
                ).scalars()
            )
            return [self._graph_from_row(r, [], []) for r in rows]

    def delete_graph(self, graph_id: GraphId) -> None:
        """Delete a graph and cascade to its nodes/edges.

        Args:
            graph_id: Target identifier.
        """
        with self._session() as session:
            row = session.get(GraphRow, str(graph_id))
            if row:
                session.delete(row)
                session.commit()

    def save_node(self, node: Node) -> None:
        """Upsert a node row.

        Args:
            node: Node to persist.
        """
        with self._session() as session:
            row = session.get(NodeRow, str(node.id))
            attrs = json.dumps(node.attributes)
            if row is None:
                session.add(
                    NodeRow(
                        id=str(node.id),
                        graph_id=str(node.graph_id),
                        type_name=node.type_name,
                        attributes_json=attrs,
                        version=node.version,
                        created_at=node.created_at.replace(tzinfo=None),
                        updated_at=node.updated_at.replace(tzinfo=None),
                    )
                )
            else:
                row.type_name = node.type_name
                row.attributes_json = attrs
                row.version = node.version
                row.updated_at = node.updated_at.replace(tzinfo=None)
            session.commit()

    def delete_node(self, node_id: NodeId) -> None:
        """Delete a node row.

        Args:
            node_id: Target identifier.
        """
        with self._session() as session:
            row = session.get(NodeRow, str(node_id))
            if row:
                session.delete(row)
                session.commit()

    def save_edge(self, edge: Edge) -> None:
        """Upsert an edge row.

        Args:
            edge: Edge to persist.
        """
        with self._session() as session:
            row = session.get(EdgeRow, str(edge.id))
            attrs = json.dumps(edge.attributes)
            if row is None:
                session.add(
                    EdgeRow(
                        id=str(edge.id),
                        graph_id=str(edge.graph_id),
                        source_id=str(edge.source_id),
                        target_id=str(edge.target_id),
                        type_name=edge.type_name,
                        attributes_json=attrs,
                        version=edge.version,
                        created_at=edge.created_at.replace(tzinfo=None),
                        updated_at=edge.updated_at.replace(tzinfo=None),
                    )
                )
            else:
                row.type_name = edge.type_name
                row.attributes_json = attrs
                row.version = edge.version
                row.updated_at = edge.updated_at.replace(tzinfo=None)
            session.commit()

    def delete_edge(self, edge_id: EdgeId) -> None:
        """Delete an edge row.

        Args:
            edge_id: Target identifier.
        """
        with self._session() as session:
            row = session.get(EdgeRow, str(edge_id))
            if row:
                session.delete(row)
                session.commit()

Functions

close
close() -> None

Dispose the underlying SQLAlchemy connection pool.

Call this when the repository is no longer needed (e.g. at the end of a test or when the workspace is closed) to avoid ResourceWarning about unclosed database connections.

Source code in src/knowledge_platform/persistence/store.py
def close(self) -> None:
    """Dispose the underlying SQLAlchemy connection pool.

    Call this when the repository is no longer needed (e.g. at the end of
    a test or when the workspace is closed) to avoid ``ResourceWarning``
    about unclosed database connections.
    """
    self._engine.dispose()
delete_edge
delete_edge(edge_id: EdgeId) -> None

Delete an edge row.

Parameters:

Name Type Description Default
edge_id EdgeId

Target identifier.

required
Source code in src/knowledge_platform/persistence/store.py
def delete_edge(self, edge_id: EdgeId) -> None:
    """Delete an edge row.

    Args:
        edge_id: Target identifier.
    """
    with self._session() as session:
        row = session.get(EdgeRow, str(edge_id))
        if row:
            session.delete(row)
            session.commit()
delete_graph
delete_graph(graph_id: GraphId) -> None

Delete a graph and cascade to its nodes/edges.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required
Source code in src/knowledge_platform/persistence/store.py
def delete_graph(self, graph_id: GraphId) -> None:
    """Delete a graph and cascade to its nodes/edges.

    Args:
        graph_id: Target identifier.
    """
    with self._session() as session:
        row = session.get(GraphRow, str(graph_id))
        if row:
            session.delete(row)
            session.commit()
delete_node
delete_node(node_id: NodeId) -> None

Delete a node row.

Parameters:

Name Type Description Default
node_id NodeId

Target identifier.

required
Source code in src/knowledge_platform/persistence/store.py
def delete_node(self, node_id: NodeId) -> None:
    """Delete a node row.

    Args:
        node_id: Target identifier.
    """
    with self._session() as session:
        row = session.get(NodeRow, str(node_id))
        if row:
            session.delete(row)
            session.commit()
ensure_workspace
ensure_workspace(
    workspace_id: WorkspaceId, name: str = ""
) -> None

Create the workspace row if it does not exist.

Parameters:

Name Type Description Default
workspace_id WorkspaceId

Workspace identifier.

required
name str

Display name.

''
Source code in src/knowledge_platform/persistence/store.py
def ensure_workspace(self, workspace_id: WorkspaceId, name: str = "") -> None:
    """Create the workspace row if it does not exist.

    Args:
        workspace_id: Workspace identifier.
        name: Display name.
    """
    with self._session() as session:
        existing = session.get(WorkspaceRow, str(workspace_id))
        if existing is None:
            now = datetime.now(timezone.utc)
            session.add(
                WorkspaceRow(
                    id=str(workspace_id),
                    name=name,
                    created_at=now,
                    updated_at=now,
                )
            )
            session.commit()
list_graphs
list_graphs(workspace_id: WorkspaceId) -> list[Graph]

Return all graphs in the given workspace (without nodes/edges).

Parameters:

Name Type Description Default
workspace_id WorkspaceId

Owning workspace.

required

Returns:

Type Description
list[Graph]

List of :class:Graph instances.

Source code in src/knowledge_platform/persistence/store.py
def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
    """Return all graphs in the given workspace (without nodes/edges).

    Args:
        workspace_id: Owning workspace.

    Returns:
        List of :class:`Graph` instances.
    """
    with self._session() as session:
        rows = list(
            session.execute(
                select(GraphRow).where(GraphRow.workspace_id == str(workspace_id))
            ).scalars()
        )
        return [self._graph_from_row(r, [], []) for r in rows]
load_graph
load_graph(graph_id: GraphId) -> Graph | None

Load a graph and all its nodes/edges from the database.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required

Returns:

Name Type Description
Reconstructed Graph | None

class:Graph or None.

Source code in src/knowledge_platform/persistence/store.py
def load_graph(self, graph_id: GraphId) -> Graph | None:
    """Load a graph and all its nodes/edges from the database.

    Args:
        graph_id: Target identifier.

    Returns:
        Reconstructed :class:`Graph` or ``None``.
    """
    with self._session() as session:
        row = session.get(GraphRow, str(graph_id))
        if row is None:
            return None
        nodes = list(session.execute(select(NodeRow).where(NodeRow.graph_id == str(graph_id))).scalars())
        edges = list(session.execute(select(EdgeRow).where(EdgeRow.graph_id == str(graph_id))).scalars())
        return self._graph_from_row(row, nodes, edges)
load_workspace
load_workspace() -> Workspace | None

Load the workspace record stored in this database, if any.

Source code in src/knowledge_platform/persistence/store.py
def load_workspace(self) -> Workspace | None:
    """Load the workspace record stored in this database, if any."""
    with self._session() as session:
        row = session.execute(select(WorkspaceRow).limit(1)).scalar_one_or_none()
        if row is None:
            return None
        return Workspace(
            id=WorkspaceId(row.id),
            name=row.name,
            db_path=self._db_path,
            created_at=row.created_at.replace(tzinfo=timezone.utc),
            updated_at=row.updated_at.replace(tzinfo=timezone.utc),
        )
save_edge
save_edge(edge: Edge) -> None

Upsert an edge row.

Parameters:

Name Type Description Default
edge Edge

Edge to persist.

required
Source code in src/knowledge_platform/persistence/store.py
def save_edge(self, edge: Edge) -> None:
    """Upsert an edge row.

    Args:
        edge: Edge to persist.
    """
    with self._session() as session:
        row = session.get(EdgeRow, str(edge.id))
        attrs = json.dumps(edge.attributes)
        if row is None:
            session.add(
                EdgeRow(
                    id=str(edge.id),
                    graph_id=str(edge.graph_id),
                    source_id=str(edge.source_id),
                    target_id=str(edge.target_id),
                    type_name=edge.type_name,
                    attributes_json=attrs,
                    version=edge.version,
                    created_at=edge.created_at.replace(tzinfo=None),
                    updated_at=edge.updated_at.replace(tzinfo=None),
                )
            )
        else:
            row.type_name = edge.type_name
            row.attributes_json = attrs
            row.version = edge.version
            row.updated_at = edge.updated_at.replace(tzinfo=None)
        session.commit()
save_graph
save_graph(graph: Graph) -> None

Upsert a graph row (without touching nodes/edges).

Parameters:

Name Type Description Default
graph Graph

Graph to persist.

required
Source code in src/knowledge_platform/persistence/store.py
def save_graph(self, graph: Graph) -> None:
    """Upsert a graph row (without touching nodes/edges).

    Args:
        graph: Graph to persist.
    """
    with self._session() as session:
        row = session.get(GraphRow, str(graph.id))
        now = graph.updated_at.replace(tzinfo=None)
        if row is None:
            session.add(
                GraphRow(
                    id=str(graph.id),
                    workspace_id=str(graph.workspace_id),
                    type_name=graph.type_name,
                    name=graph.name,
                    version=graph.version,
                    created_at=graph.created_at.replace(tzinfo=None),
                    updated_at=now,
                )
            )
        else:
            row.name = graph.name
            row.version = graph.version
            row.updated_at = now
        session.commit()
save_node
save_node(node: Node) -> None

Upsert a node row.

Parameters:

Name Type Description Default
node Node

Node to persist.

required
Source code in src/knowledge_platform/persistence/store.py
def save_node(self, node: Node) -> None:
    """Upsert a node row.

    Args:
        node: Node to persist.
    """
    with self._session() as session:
        row = session.get(NodeRow, str(node.id))
        attrs = json.dumps(node.attributes)
        if row is None:
            session.add(
                NodeRow(
                    id=str(node.id),
                    graph_id=str(node.graph_id),
                    type_name=node.type_name,
                    attributes_json=attrs,
                    version=node.version,
                    created_at=node.created_at.replace(tzinfo=None),
                    updated_at=node.updated_at.replace(tzinfo=None),
                )
            )
        else:
            row.type_name = node.type_name
            row.attributes_json = attrs
            row.version = node.version
            row.updated_at = node.updated_at.replace(tzinfo=None)
        session.commit()