Transaction

The transaction module provides a unit-of-work pattern for batching graph mutations. Changes are recorded into a Transaction object and then applied atomically by GraphEngine.commit().

Why Transactions?

Without transactions, each individual node or edge mutation is immediately persisted. This means:

  • A partial failure leaves the data in an inconsistent state.
  • There is no natural grouping of related changes for logging or auditing.

With transactions, a set of related changes (e.g. "add a node and connect it to its parent") is either applied in full or not at all (from the in-memory perspective).

Current durability guarantee

The transaction ensures in-memory consistency. Full ACID durability (rollback on DB failure) will be added in a future release. For now, if GraphEngine.commit() raises part-way through, reload the graph from the repository.

The transaction() Context Manager

The most ergonomic way to work with transactions is the transaction() context manager:

from knowledge_platform.core.transaction import transaction

with transaction(graph.id) as tx:
    tx.record_add_node(parent_node)
    tx.record_add_node(child_node)
    tx.record_add_edge(edge)
    engine.commit(tx)
# tx.committed is True here

If an exception is raised inside the with block before engine.commit() is called, the transaction is marked rolled_back=True and no changes are applied:

with pytest.raises(ValueError):
    with transaction(graph.id) as tx:
        tx.record_add_node(node)
        raise ValueError("something went wrong")

assert tx.rolled_back   # True — nothing was applied
assert not tx.committed  # False

Manual Usage

You can also build a Transaction manually:

from knowledge_platform.core.transaction import Transaction

tx = Transaction(graph_id=graph.id)
tx.record_add_node(node_a)
tx.record_update_node(node_b_updated)
tx.record_remove_edge(old_edge_id)
engine.commit(tx)

Supported Operations

record_* method Engine operation applied Payload type
record_add_node(node) graph.add_node + repo.save_node Node
record_update_node(node) graph.update_node + repo.save_node Node
record_remove_node(node_id) graph.remove_node + repo.delete_node NodeId
record_add_edge(edge) graph.add_edge + repo.save_edge Edge
record_remove_edge(edge_id) graph.remove_edge + repo.delete_edge EdgeId

Guard Conditions

Calling any record_* method on a committed or rolled-back transaction raises RuntimeError. Likewise, GraphEngine.commit() refuses a transaction that is already committed or rolled-back.

API Reference

knowledge_platform.core.transaction.ChangeRecord dataclass

Represents a single recorded change within a transaction.

Attributes:

Name Type Description
operation str

One of "add_node", "update_node", "remove_node", "add_edge", "remove_edge".

payload Any

The entity involved (Node, Edge, or just an ID string).

Source code in src/knowledge_platform/core/transaction.py
@dataclass
class ChangeRecord:
    """Represents a single recorded change within a transaction.

    Attributes:
        operation: One of ``"add_node"``, ``"update_node"``, ``"remove_node"``,
            ``"add_edge"``, ``"remove_edge"``.
        payload: The entity involved (Node, Edge, or just an ID string).
    """

    operation: str
    payload: Any

knowledge_platform.core.transaction.Transaction dataclass

Collects a batch of graph mutations to be committed atomically.

Usage::

tx = Transaction(graph_id=gid)
tx.record_add_node(node)
tx.record_add_edge(edge)
# apply all changes via GraphEngine.commit(tx)

Attributes:

Name Type Description
graph_id GraphId

The graph this transaction targets.

changes list[ChangeRecord]

Ordered list of recorded changes.

committed bool

Whether this transaction has been successfully committed.

rolled_back bool

Whether this transaction has been rolled back.

Source code in src/knowledge_platform/core/transaction.py
@dataclass
class Transaction:
    """Collects a batch of graph mutations to be committed atomically.

    Usage::

        tx = Transaction(graph_id=gid)
        tx.record_add_node(node)
        tx.record_add_edge(edge)
        # apply all changes via GraphEngine.commit(tx)

    Attributes:
        graph_id: The graph this transaction targets.
        changes: Ordered list of recorded changes.
        committed: Whether this transaction has been successfully committed.
        rolled_back: Whether this transaction has been rolled back.
    """

    graph_id: GraphId
    changes: list[ChangeRecord] = field(default_factory=list)
    committed: bool = False
    rolled_back: bool = False

    def record_add_node(self, node: Node) -> None:
        """Record a node addition.

        Args:
            node: The node to add.
        """
        self._assert_open()
        self.changes.append(ChangeRecord(operation="add_node", payload=node))

    def record_update_node(self, node: Node) -> None:
        """Record a node update.

        Args:
            node: The updated node.
        """
        self._assert_open()
        self.changes.append(ChangeRecord(operation="update_node", payload=node))

    def record_remove_node(self, node_id: NodeId) -> None:
        """Record a node removal.

        Args:
            node_id: ID of the node to remove.
        """
        self._assert_open()
        self.changes.append(ChangeRecord(operation="remove_node", payload=node_id))

    def record_add_edge(self, edge: Edge) -> None:
        """Record an edge addition.

        Args:
            edge: The edge to add.
        """
        self._assert_open()
        self.changes.append(ChangeRecord(operation="add_edge", payload=edge))

    def record_remove_edge(self, edge_id: EdgeId) -> None:
        """Record an edge removal.

        Args:
            edge_id: ID of the edge to remove.
        """
        self._assert_open()
        self.changes.append(ChangeRecord(operation="remove_edge", payload=edge_id))

    def _assert_open(self) -> None:
        if self.committed:
            raise RuntimeError("Transaction already committed.")
        if self.rolled_back:
            raise RuntimeError("Transaction already rolled back.")

Functions

record_add_edge
record_add_edge(edge: Edge) -> None

Record an edge addition.

Parameters:

Name Type Description Default
edge Edge

The edge to add.

required
Source code in src/knowledge_platform/core/transaction.py
def record_add_edge(self, edge: Edge) -> None:
    """Record an edge addition.

    Args:
        edge: The edge to add.
    """
    self._assert_open()
    self.changes.append(ChangeRecord(operation="add_edge", payload=edge))
record_add_node
record_add_node(node: Node) -> None

Record a node addition.

Parameters:

Name Type Description Default
node Node

The node to add.

required
Source code in src/knowledge_platform/core/transaction.py
def record_add_node(self, node: Node) -> None:
    """Record a node addition.

    Args:
        node: The node to add.
    """
    self._assert_open()
    self.changes.append(ChangeRecord(operation="add_node", payload=node))
record_remove_edge
record_remove_edge(edge_id: EdgeId) -> None

Record an edge removal.

Parameters:

Name Type Description Default
edge_id EdgeId

ID of the edge to remove.

required
Source code in src/knowledge_platform/core/transaction.py
def record_remove_edge(self, edge_id: EdgeId) -> None:
    """Record an edge removal.

    Args:
        edge_id: ID of the edge to remove.
    """
    self._assert_open()
    self.changes.append(ChangeRecord(operation="remove_edge", payload=edge_id))
record_remove_node
record_remove_node(node_id: NodeId) -> None

Record a node removal.

Parameters:

Name Type Description Default
node_id NodeId

ID of the node to remove.

required
Source code in src/knowledge_platform/core/transaction.py
def record_remove_node(self, node_id: NodeId) -> None:
    """Record a node removal.

    Args:
        node_id: ID of the node to remove.
    """
    self._assert_open()
    self.changes.append(ChangeRecord(operation="remove_node", payload=node_id))
record_update_node
record_update_node(node: Node) -> None

Record a node update.

Parameters:

Name Type Description Default
node Node

The updated node.

required
Source code in src/knowledge_platform/core/transaction.py
def record_update_node(self, node: Node) -> None:
    """Record a node update.

    Args:
        node: The updated node.
    """
    self._assert_open()
    self.changes.append(ChangeRecord(operation="update_node", payload=node))

knowledge_platform.core.transaction.transaction

transaction(
    graph_id: GraphId,
) -> Generator[Transaction, None, None]

Context manager that yields a :class:Transaction for graph_id.

The caller must pass the transaction to :meth:~knowledge_platform.core.engine.GraphEngine.commit to apply the changes. If an exception propagates the transaction is marked as rolled-back (no changes are applied).

Parameters:

Name Type Description Default
graph_id GraphId

Owning graph identifier.

required

Yields:

Type Description
Transaction

An open :class:Transaction.

Raises:

Type Description
Exception

Any exception raised inside the with block, after marking the transaction as rolled back.

Source code in src/knowledge_platform/core/transaction.py
@contextlib.contextmanager
def transaction(graph_id: GraphId) -> Generator[Transaction, None, None]:
    """Context manager that yields a :class:`Transaction` for *graph_id*.

    The caller must pass the transaction to
    :meth:`~knowledge_platform.core.engine.GraphEngine.commit` to apply the
    changes.  If an exception propagates the transaction is marked as
    rolled-back (no changes are applied).

    Args:
        graph_id: Owning graph identifier.

    Yields:
        An open :class:`Transaction`.

    Raises:
        Exception: Any exception raised inside the ``with`` block, after
            marking the transaction as rolled back.
    """
    tx = Transaction(graph_id=graph_id)
    try:
        yield tx
    except Exception:
        tx.rolled_back = True
        raise