GraphEngine and GraphRepository

GraphEngine is the single entry-point for all graph mutations. It owns an in-memory cache of loaded graphs and delegates all persistence I/O through the injected GraphRepository.

Architecture

graph TD
    Service["GraphService / OutlineService"] --> Engine["GraphEngine"]
    Engine --> Cache["_cache: GraphId to Graph dict"]
    Engine --> Repo["GraphRepository (interface)"]
    Repo --> SQLite["SqliteGraphRepository"]

The engine follows the Repository + Cache pattern:

  1. On get_graph(), check the cache first. On a cache miss, load from the repository and populate the cache.
  2. On any mutation, update both the in-memory graph and the repository so they stay in sync.
  3. On delete_graph(), evict from cache and delete from repository.

GraphRepository Interface

GraphRepository is an abstract base class that any persistence backend must implement. The engine depends only on this interface, never on SQLAlchemy or any other framework.

GraphRepository
├── save_graph(graph)
├── load_graph(graph_id) -> Graph | None
├── list_graphs(workspace_id) -> list[Graph]
├── delete_graph(graph_id)
├── save_node(node)
├── delete_node(node_id)
├── save_edge(edge)
└── delete_edge(edge_id)

To provide a custom persistence backend (e.g. PostgreSQL, a remote API), subclass GraphRepository and implement all eight methods.

Single-Operation Helpers vs. Transactions

The engine provides two ways to mutate graphs:

Single-operation helpers (auto-commit each change immediately):

engine.add_node(node)
engine.update_node(updated_node)
engine.remove_node(graph_id, node_id)
engine.add_edge(edge)
engine.remove_edge(graph_id, edge_id)

Explicit transactions (batch multiple operations):

from knowledge_platform.core.transaction import transaction

with transaction(graph.id) as tx:
    tx.record_add_node(parent)
    tx.record_add_node(child)
    tx.record_add_edge(edge)
    engine.commit(tx)

Use the single-operation helpers for simple, independent changes. Use transactions when you need atomicity across multiple related changes.

Graph Lifecycle

# Create
graph = engine.create_graph(workspace_id, "outline", "My Notes")

# Read (from cache or repo)
graph = engine.get_graph(graph_id)

# List all in a workspace
graphs = engine.list_graphs(workspace_id)

# Delete (evicts cache + removes from repo)
engine.delete_graph(graph_id)

Error Handling

Operation Raises Condition
get_graph(id) KeyError Graph not found in cache or repository
commit(tx) RuntimeError Transaction already committed or rolled back
commit(tx) ValueError Unknown operation in a ChangeRecord

API Reference

knowledge_platform.core.engine.GraphRepository

Abstract repository interface that persistence implementations must satisfy.

The engine delegates all I/O through this interface, keeping the engine itself free of SQLAlchemy or file-system concerns.

Source code in src/knowledge_platform/core/engine.py
class GraphRepository:
    """Abstract repository interface that persistence implementations must satisfy.

    The engine delegates all I/O through this interface, keeping the engine
    itself free of SQLAlchemy or file-system concerns.
    """

    def save_graph(self, graph: Graph) -> None:
        """Persist a graph record (insert or update).

        Args:
            graph: The graph to save.
        """
        raise NotImplementedError

    def load_graph(self, graph_id: GraphId) -> Graph | None:
        """Load a graph by ID, or return ``None``.

        Args:
            graph_id: Target identifier.

        Returns:
            Loaded :class:`Graph` or ``None``.
        """
        raise NotImplementedError

    def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
        """Return all graphs in a workspace.

        Args:
            workspace_id: Owning workspace.

        Returns:
            List of :class:`Graph` instances (may be empty).
        """
        raise NotImplementedError

    def delete_graph(self, graph_id: GraphId) -> None:
        """Delete a graph and all its nodes/edges.

        Args:
            graph_id: Target identifier.
        """
        raise NotImplementedError

    def save_node(self, node: Node) -> None:
        """Persist a node (insert or update).

        Args:
            node: The node to save.
        """
        raise NotImplementedError

    def delete_node(self, node_id: NodeId) -> None:
        """Delete a node.

        Args:
            node_id: Target identifier.
        """
        raise NotImplementedError

    def save_edge(self, edge: Edge) -> None:
        """Persist an edge (insert or update).

        Args:
            edge: The edge to save.
        """
        raise NotImplementedError

    def delete_edge(self, edge_id: EdgeId) -> None:
        """Delete an edge.

        Args:
            edge_id: Target identifier.
        """
        raise NotImplementedError

Functions

delete_edge
delete_edge(edge_id: EdgeId) -> None

Delete an edge.

Parameters:

Name Type Description Default
edge_id EdgeId

Target identifier.

required
Source code in src/knowledge_platform/core/engine.py
def delete_edge(self, edge_id: EdgeId) -> None:
    """Delete an edge.

    Args:
        edge_id: Target identifier.
    """
    raise NotImplementedError
delete_graph
delete_graph(graph_id: GraphId) -> None

Delete a graph and all its nodes/edges.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required
Source code in src/knowledge_platform/core/engine.py
def delete_graph(self, graph_id: GraphId) -> None:
    """Delete a graph and all its nodes/edges.

    Args:
        graph_id: Target identifier.
    """
    raise NotImplementedError
delete_node
delete_node(node_id: NodeId) -> None

Delete a node.

Parameters:

Name Type Description Default
node_id NodeId

Target identifier.

required
Source code in src/knowledge_platform/core/engine.py
def delete_node(self, node_id: NodeId) -> None:
    """Delete a node.

    Args:
        node_id: Target identifier.
    """
    raise NotImplementedError
list_graphs
list_graphs(workspace_id: WorkspaceId) -> list[Graph]

Return all graphs in a workspace.

Parameters:

Name Type Description Default
workspace_id WorkspaceId

Owning workspace.

required

Returns:

Type Description
list[Graph]

List of :class:Graph instances (may be empty).

Source code in src/knowledge_platform/core/engine.py
def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
    """Return all graphs in a workspace.

    Args:
        workspace_id: Owning workspace.

    Returns:
        List of :class:`Graph` instances (may be empty).
    """
    raise NotImplementedError
load_graph
load_graph(graph_id: GraphId) -> Graph | None

Load a graph by ID, or return None.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required

Returns:

Name Type Description
Loaded Graph | None

class:Graph or None.

Source code in src/knowledge_platform/core/engine.py
def load_graph(self, graph_id: GraphId) -> Graph | None:
    """Load a graph by ID, or return ``None``.

    Args:
        graph_id: Target identifier.

    Returns:
        Loaded :class:`Graph` or ``None``.
    """
    raise NotImplementedError
save_edge
save_edge(edge: Edge) -> None

Persist an edge (insert or update).

Parameters:

Name Type Description Default
edge Edge

The edge to save.

required
Source code in src/knowledge_platform/core/engine.py
def save_edge(self, edge: Edge) -> None:
    """Persist an edge (insert or update).

    Args:
        edge: The edge to save.
    """
    raise NotImplementedError
save_graph
save_graph(graph: Graph) -> None

Persist a graph record (insert or update).

Parameters:

Name Type Description Default
graph Graph

The graph to save.

required
Source code in src/knowledge_platform/core/engine.py
def save_graph(self, graph: Graph) -> None:
    """Persist a graph record (insert or update).

    Args:
        graph: The graph to save.
    """
    raise NotImplementedError
save_node
save_node(node: Node) -> None

Persist a node (insert or update).

Parameters:

Name Type Description Default
node Node

The node to save.

required
Source code in src/knowledge_platform/core/engine.py
def save_node(self, node: Node) -> None:
    """Persist a node (insert or update).

    Args:
        node: The node to save.
    """
    raise NotImplementedError

knowledge_platform.core.engine.GraphEngine

Orchestrates graph operations against an in-memory cache + repository.

:class:GraphEngine is the single entry-point for all graph mutations. It keeps a live in-memory cache of loaded graphs and persists changes through the injected :class:GraphRepository.

Parameters:

Name Type Description Default
repository GraphRepository

The persistence back-end.

required
Source code in src/knowledge_platform/core/engine.py
class GraphEngine:
    """Orchestrates graph operations against an in-memory cache + repository.

    :class:`GraphEngine` is the single entry-point for all graph mutations.
    It keeps a live in-memory cache of loaded graphs and persists changes
    through the injected :class:`GraphRepository`.

    Args:
        repository: The persistence back-end.
    """

    def __init__(self, repository: GraphRepository) -> None:
        self._repo = repository
        self._cache: dict[GraphId, Graph] = {}

    # ------------------------------------------------------------------
    # Graph lifecycle
    # ------------------------------------------------------------------

    def create_graph(
        self,
        workspace_id: WorkspaceId,
        type_name: str,
        name: str = "",
    ) -> Graph:
        """Create and persist a new empty graph.

        Args:
            workspace_id: Owning workspace.
            type_name: Semantic graph type name.
            name: Optional display name.

        Returns:
            Newly created :class:`Graph`.
        """
        graph = Graph.create(workspace_id, type_name, name)
        self._repo.save_graph(graph)
        self._cache[graph.id] = graph
        logger.info("graph.created", graph_id=graph.id, type_name=type_name)
        return graph

    def get_graph(self, graph_id: GraphId) -> Graph:
        """Return a graph from cache or repository.

        Args:
            graph_id: Target identifier.

        Returns:
            The :class:`Graph`.

        Raises:
            KeyError: If the graph does not exist.
        """
        if graph_id in self._cache:
            return self._cache[graph_id]
        graph = self._repo.load_graph(graph_id)
        if graph is None:
            raise KeyError(f"Graph {graph_id!r} not found")
        self._cache[graph_id] = graph
        return graph

    def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
        """List all graphs in a workspace.

        Args:
            workspace_id: Owning workspace.

        Returns:
            List of :class:`Graph` instances.
        """
        return self._repo.list_graphs(workspace_id)

    def delete_graph(self, graph_id: GraphId) -> None:
        """Delete a graph and remove it from cache.

        Args:
            graph_id: Target identifier.
        """
        self._cache.pop(graph_id, None)
        self._repo.delete_graph(graph_id)
        logger.info("graph.deleted", graph_id=graph_id)

    # ------------------------------------------------------------------
    # Transaction commit
    # ------------------------------------------------------------------

    def commit(self, tx: Transaction) -> None:
        """Apply all changes recorded in *tx* to the graph and repository.

        Changes are applied in recording order.  On any error the in-memory
        graph is left in a partially-applied state; callers should discard
        and reload the graph if recovery is required.

        Args:
            tx: A completed (not yet committed) :class:`Transaction`.

        Raises:
            RuntimeError: If the transaction is already committed or rolled back.
            KeyError: If the target graph is not found.
        """
        if tx.committed:
            raise RuntimeError("Transaction already committed.")
        if tx.rolled_back:
            raise RuntimeError("Cannot commit a rolled-back transaction.")

        graph = self.get_graph(tx.graph_id)
        log = logger.bind(graph_id=tx.graph_id, change_count=len(tx.changes))
        log.info("transaction.commit.start")

        for record in tx.changes:
            op = record.operation
            payload = record.payload
            if op == "add_node":
                graph.add_node(payload)
                self._repo.save_node(payload)
            elif op == "update_node":
                graph.update_node(payload)
                self._repo.save_node(payload)
            elif op == "remove_node":
                graph.remove_node(payload)
                self._repo.delete_node(payload)
            elif op == "add_edge":
                graph.add_edge(payload)
                self._repo.save_edge(payload)
            elif op == "remove_edge":
                graph.remove_edge(payload)
                self._repo.delete_edge(payload)
            else:
                raise ValueError(f"Unknown operation: {op!r}")

        self._repo.save_graph(graph)
        tx.committed = True
        log.info("transaction.commit.done")

    # ------------------------------------------------------------------
    # Convenience single-operation helpers (auto-commit)
    # ------------------------------------------------------------------

    def add_node(self, node: Node) -> None:
        """Add a node to its owning graph without an explicit transaction.

        Args:
            node: Node to add; its :attr:`~Node.graph_id` must match a loaded graph.
        """
        graph = self.get_graph(node.graph_id)
        graph.add_node(node)
        self._repo.save_node(node)
        self._repo.save_graph(graph)
        logger.debug("node.added", node_id=node.id, graph_id=node.graph_id)

    def update_node(self, node: Node) -> None:
        """Update a node in its owning graph without an explicit transaction.

        Args:
            node: Updated node.
        """
        graph = self.get_graph(node.graph_id)
        graph.update_node(node)
        self._repo.save_node(node)
        self._repo.save_graph(graph)
        logger.debug("node.updated", node_id=node.id, version=node.version)

    def remove_node(self, graph_id: GraphId, node_id: NodeId) -> None:
        """Remove a node from a graph without an explicit transaction.

        Args:
            graph_id: Owning graph.
            node_id: Node to remove.
        """
        graph = self.get_graph(graph_id)
        graph.remove_node(node_id)
        self._repo.delete_node(node_id)
        self._repo.save_graph(graph)
        logger.debug("node.removed", node_id=node_id, graph_id=graph_id)

    def add_edge(self, edge: Edge) -> None:
        """Add an edge to its owning graph without an explicit transaction.

        Args:
            edge: Edge to add.
        """
        graph = self.get_graph(edge.graph_id)
        graph.add_edge(edge)
        self._repo.save_edge(edge)
        self._repo.save_graph(graph)
        logger.debug("edge.added", edge_id=edge.id, graph_id=edge.graph_id)

    def remove_edge(self, graph_id: GraphId, edge_id: EdgeId) -> None:
        """Remove an edge from a graph without an explicit transaction.

        Args:
            graph_id: Owning graph.
            edge_id: Edge to remove.
        """
        graph = self.get_graph(graph_id)
        graph.remove_edge(edge_id)
        self._repo.delete_edge(edge_id)
        self._repo.save_graph(graph)
        logger.debug("edge.removed", edge_id=edge_id, graph_id=graph_id)

Functions

add_edge
add_edge(edge: Edge) -> None

Add an edge to its owning graph without an explicit transaction.

Parameters:

Name Type Description Default
edge Edge

Edge to add.

required
Source code in src/knowledge_platform/core/engine.py
def add_edge(self, edge: Edge) -> None:
    """Add an edge to its owning graph without an explicit transaction.

    Args:
        edge: Edge to add.
    """
    graph = self.get_graph(edge.graph_id)
    graph.add_edge(edge)
    self._repo.save_edge(edge)
    self._repo.save_graph(graph)
    logger.debug("edge.added", edge_id=edge.id, graph_id=edge.graph_id)
add_node
add_node(node: Node) -> None

Add a node to its owning graph without an explicit transaction.

Parameters:

Name Type Description Default
node Node

Node to add; its :attr:~Node.graph_id must match a loaded graph.

required
Source code in src/knowledge_platform/core/engine.py
def add_node(self, node: Node) -> None:
    """Add a node to its owning graph without an explicit transaction.

    Args:
        node: Node to add; its :attr:`~Node.graph_id` must match a loaded graph.
    """
    graph = self.get_graph(node.graph_id)
    graph.add_node(node)
    self._repo.save_node(node)
    self._repo.save_graph(graph)
    logger.debug("node.added", node_id=node.id, graph_id=node.graph_id)
commit
commit(tx: Transaction) -> None

Apply all changes recorded in tx to the graph and repository.

Changes are applied in recording order. On any error the in-memory graph is left in a partially-applied state; callers should discard and reload the graph if recovery is required.

Parameters:

Name Type Description Default
tx Transaction

A completed (not yet committed) :class:Transaction.

required

Raises:

Type Description
RuntimeError

If the transaction is already committed or rolled back.

KeyError

If the target graph is not found.

Source code in src/knowledge_platform/core/engine.py
def commit(self, tx: Transaction) -> None:
    """Apply all changes recorded in *tx* to the graph and repository.

    Changes are applied in recording order.  On any error the in-memory
    graph is left in a partially-applied state; callers should discard
    and reload the graph if recovery is required.

    Args:
        tx: A completed (not yet committed) :class:`Transaction`.

    Raises:
        RuntimeError: If the transaction is already committed or rolled back.
        KeyError: If the target graph is not found.
    """
    if tx.committed:
        raise RuntimeError("Transaction already committed.")
    if tx.rolled_back:
        raise RuntimeError("Cannot commit a rolled-back transaction.")

    graph = self.get_graph(tx.graph_id)
    log = logger.bind(graph_id=tx.graph_id, change_count=len(tx.changes))
    log.info("transaction.commit.start")

    for record in tx.changes:
        op = record.operation
        payload = record.payload
        if op == "add_node":
            graph.add_node(payload)
            self._repo.save_node(payload)
        elif op == "update_node":
            graph.update_node(payload)
            self._repo.save_node(payload)
        elif op == "remove_node":
            graph.remove_node(payload)
            self._repo.delete_node(payload)
        elif op == "add_edge":
            graph.add_edge(payload)
            self._repo.save_edge(payload)
        elif op == "remove_edge":
            graph.remove_edge(payload)
            self._repo.delete_edge(payload)
        else:
            raise ValueError(f"Unknown operation: {op!r}")

    self._repo.save_graph(graph)
    tx.committed = True
    log.info("transaction.commit.done")
create_graph
create_graph(
    workspace_id: WorkspaceId,
    type_name: str,
    name: str = "",
) -> Graph

Create and persist a new empty graph.

Parameters:

Name Type Description Default
workspace_id WorkspaceId

Owning workspace.

required
type_name str

Semantic graph type name.

required
name str

Optional display name.

''

Returns:

Type Description
Graph

Newly created :class:Graph.

Source code in src/knowledge_platform/core/engine.py
def create_graph(
    self,
    workspace_id: WorkspaceId,
    type_name: str,
    name: str = "",
) -> Graph:
    """Create and persist a new empty graph.

    Args:
        workspace_id: Owning workspace.
        type_name: Semantic graph type name.
        name: Optional display name.

    Returns:
        Newly created :class:`Graph`.
    """
    graph = Graph.create(workspace_id, type_name, name)
    self._repo.save_graph(graph)
    self._cache[graph.id] = graph
    logger.info("graph.created", graph_id=graph.id, type_name=type_name)
    return graph
delete_graph
delete_graph(graph_id: GraphId) -> None

Delete a graph and remove it from cache.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required
Source code in src/knowledge_platform/core/engine.py
def delete_graph(self, graph_id: GraphId) -> None:
    """Delete a graph and remove it from cache.

    Args:
        graph_id: Target identifier.
    """
    self._cache.pop(graph_id, None)
    self._repo.delete_graph(graph_id)
    logger.info("graph.deleted", graph_id=graph_id)
get_graph
get_graph(graph_id: GraphId) -> Graph

Return a graph from cache or repository.

Parameters:

Name Type Description Default
graph_id GraphId

Target identifier.

required

Returns:

Name Type Description
The Graph

class:Graph.

Raises:

Type Description
KeyError

If the graph does not exist.

Source code in src/knowledge_platform/core/engine.py
def get_graph(self, graph_id: GraphId) -> Graph:
    """Return a graph from cache or repository.

    Args:
        graph_id: Target identifier.

    Returns:
        The :class:`Graph`.

    Raises:
        KeyError: If the graph does not exist.
    """
    if graph_id in self._cache:
        return self._cache[graph_id]
    graph = self._repo.load_graph(graph_id)
    if graph is None:
        raise KeyError(f"Graph {graph_id!r} not found")
    self._cache[graph_id] = graph
    return graph
list_graphs
list_graphs(workspace_id: WorkspaceId) -> list[Graph]

List all graphs in a workspace.

Parameters:

Name Type Description Default
workspace_id WorkspaceId

Owning workspace.

required

Returns:

Type Description
list[Graph]

List of :class:Graph instances.

Source code in src/knowledge_platform/core/engine.py
def list_graphs(self, workspace_id: WorkspaceId) -> list[Graph]:
    """List all graphs in a workspace.

    Args:
        workspace_id: Owning workspace.

    Returns:
        List of :class:`Graph` instances.
    """
    return self._repo.list_graphs(workspace_id)
remove_edge
remove_edge(graph_id: GraphId, edge_id: EdgeId) -> None

Remove an edge from a graph without an explicit transaction.

Parameters:

Name Type Description Default
graph_id GraphId

Owning graph.

required
edge_id EdgeId

Edge to remove.

required
Source code in src/knowledge_platform/core/engine.py
def remove_edge(self, graph_id: GraphId, edge_id: EdgeId) -> None:
    """Remove an edge from a graph without an explicit transaction.

    Args:
        graph_id: Owning graph.
        edge_id: Edge to remove.
    """
    graph = self.get_graph(graph_id)
    graph.remove_edge(edge_id)
    self._repo.delete_edge(edge_id)
    self._repo.save_graph(graph)
    logger.debug("edge.removed", edge_id=edge_id, graph_id=graph_id)
remove_node
remove_node(graph_id: GraphId, node_id: NodeId) -> None

Remove a node from a graph without an explicit transaction.

Parameters:

Name Type Description Default
graph_id GraphId

Owning graph.

required
node_id NodeId

Node to remove.

required
Source code in src/knowledge_platform/core/engine.py
def remove_node(self, graph_id: GraphId, node_id: NodeId) -> None:
    """Remove a node from a graph without an explicit transaction.

    Args:
        graph_id: Owning graph.
        node_id: Node to remove.
    """
    graph = self.get_graph(graph_id)
    graph.remove_node(node_id)
    self._repo.delete_node(node_id)
    self._repo.save_graph(graph)
    logger.debug("node.removed", node_id=node_id, graph_id=graph_id)
update_node
update_node(node: Node) -> None

Update a node in its owning graph without an explicit transaction.

Parameters:

Name Type Description Default
node Node

Updated node.

required
Source code in src/knowledge_platform/core/engine.py
def update_node(self, node: Node) -> None:
    """Update a node in its owning graph without an explicit transaction.

    Args:
        node: Updated node.
    """
    graph = self.get_graph(node.graph_id)
    graph.update_node(node)
    self._repo.save_node(node)
    self._repo.save_graph(graph)
    logger.debug("node.updated", node_id=node.id, version=node.version)