"""The append-only operation log. The log is the only durable state a Mnema node has: everything else (the derivation graph, claim indexes, explanations shown in a UI) is a deterministic replay of the log. This makes the node auditable and portable, and it is what milestone 6 will synchronize between nodes. This implementation stores operations in SQLite (stdlib ``sqlite3``), with triggers that make the table physically append-only — UPDATE and DELETE are rejected at the database layer, not just by convention. Guarantees enforced on append: * the operation's signature and content address verify; * every ``prev`` causal reference already exists in the log (no dangling causal edges); * appends are idempotent: re-appending an existing operation returns its original sequence number and changes nothing. """ from __future__ import annotations import json import sqlite3 import threading from pathlib import Path from typing import Iterable, Iterator, List, Optional, Tuple, Union from mnema.core.canonical import canonical_dumps from mnema.core.operations import Operation __all__ = ["OpLog", "LogError", "DanglingReferenceError"] class LogError(Exception): """Base class for log failures.""" class DanglingReferenceError(LogError): """An operation referenced a prev op id that is not in the log.""" _SCHEMA = """ CREATE TABLE IF NOT EXISTS ops ( seq INTEGER PRIMARY KEY AUTOINCREMENT, op_id TEXT NOT NULL UNIQUE, type TEXT NOT NULL, author TEXT NOT NULL, ts INTEGER NOT NULL, wire TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_ops_type ON ops(type); CREATE INDEX IF NOT EXISTS idx_ops_author ON ops(author); CREATE TRIGGER IF NOT EXISTS ops_no_update BEFORE UPDATE ON ops BEGIN SELECT RAISE(ABORT, 'mnema log is append-only: UPDATE forbidden'); END; CREATE TRIGGER IF NOT EXISTS ops_no_delete BEFORE DELETE ON ops BEGIN SELECT RAISE(ABORT, 'mnema log is append-only: DELETE forbidden'); END; """ class OpLog: """Append-only, signature-verified operation log backed by SQLite.""" def __init__(self, path: Union[str, Path] = ":memory:") -> None: self._path = str(path) self._conn = sqlite3.connect(self._path, check_same_thread=False) self._lock = threading.RLock() with self._lock: self._conn.executescript(_SCHEMA) self._conn.commit() # -- mutation ----------------------------------------------------------- def append(self, op: Operation, *, verify: bool = True, check_prev: bool = True) -> int: """Append *op*; returns its sequence number. Idempotent: appending an operation already in the log returns the existing sequence number without re-writing anything. """ with self._lock: existing = self._seq_of(op.op_id) if existing is not None: return existing if verify: op.verify() if check_prev: for p in op.prev: if self._seq_of(p) is None: raise DanglingReferenceError( f"operation {op.op_id} references unknown prev {p}" ) wire = canonical_dumps(op.to_wire()) cur = self._conn.execute( "INSERT INTO ops (op_id, type, author, ts, wire) VALUES (?, ?, ?, ?, ?)", (op.op_id, op.type, op.author, op.ts, wire), ) self._conn.commit() return int(cur.lastrowid) def append_all(self, ops: Iterable[Operation], **kwargs) -> List[int]: return [self.append(op, **kwargs) for op in ops] # -- lookup ------------------------------------------------------------- def _seq_of(self, op_id: str) -> Optional[int]: row = self._conn.execute( "SELECT seq FROM ops WHERE op_id = ?", (op_id,) ).fetchone() return int(row[0]) if row else None def seq_of(self, op_id: str) -> Optional[int]: with self._lock: return self._seq_of(op_id) def get(self, op_id: str) -> Operation: with self._lock: row = self._conn.execute( "SELECT wire FROM ops WHERE op_id = ?", (op_id,) ).fetchone() if row is None: raise LogError(f"operation not found: {op_id}") # Already verified on append; skip re-verification on read. return Operation.from_wire(json.loads(row[0]), verify=False) def __contains__(self, op_id: object) -> bool: if not isinstance(op_id, str): return False return self.seq_of(op_id) is not None def __len__(self) -> int: with self._lock: row = self._conn.execute("SELECT COUNT(*) FROM ops").fetchone() return int(row[0]) def last_op_id(self) -> Optional[str]: """Op id of the most recently appended operation (for prev links).""" with self._lock: row = self._conn.execute( "SELECT op_id FROM ops ORDER BY seq DESC LIMIT 1" ).fetchone() return row[0] if row else None # -- iteration ---------------------------------------------------------- def iter_ops(self, *, types: Optional[Iterable[str]] = None, since_seq: int = 0) -> Iterator[Tuple[int, Operation]]: """Yield ``(seq, op)`` in log order. Snapshot semantics: rows are fetched up front, so appending during iteration is safe (new rows simply are not yielded). """ type_list = sorted(set(types)) if types is not None else None with self._lock: if type_list is None: rows = self._conn.execute( "SELECT seq, wire FROM ops WHERE seq > ? ORDER BY seq", (since_seq,), ).fetchall() else: placeholders = ",".join("?" for _ in type_list) rows = self._conn.execute( f"SELECT seq, wire FROM ops WHERE seq > ? AND type IN ({placeholders}) " "ORDER BY seq", (since_seq, *type_list), ).fetchall() for seq, wire in rows: yield int(seq), Operation.from_wire(json.loads(wire), verify=False) def by_type(self, op_type: str) -> List[Operation]: return [op for _, op in self.iter_ops(types=[op_type])] # -- lifecycle ---------------------------------------------------------- def close(self) -> None: with self._lock: self._conn.close() def __enter__(self) -> "OpLog": return self def __exit__(self, *exc) -> None: self.close()