"""Append-only operation log store for a FablePool node. Revised in milestone 6 to expose the surface the sync engine needs: ``frontier()``, ``ops_since()``, ``op_ids()``, ``max_lamport()``, and ``by_type()`` — plus thread safety, because the stdlib HTTP transport serves sync requests from worker threads. Design notes ------------ * The store is content-addressed: an operation's identity is its ``op_id`` (a hash over its canonical unsigned form, per the milestone-2 wire format). Appending the same operation twice is a harmless no-op. * The store does **not** require that an operation's ``prev`` references are present. Partial replicas are legal in the protocol (a delegate node holds only the control-plane operations it was served). *Ordered* ingestion with dependency deferral is the sync engine's job (see ``fablepool.sync``). * Persistence is a JSON-lines file, one canonical operation envelope per line. ``path=None`` keeps the log purely in memory (tests, demos). """ from __future__ import annotations import json import os import threading from pathlib import Path from typing import Dict, Iterable, List, Optional, Set, Union from .ops import Operation, op_from_dict, verify_op class StoreError(Exception): """Raised on integrity failures or missing operations.""" class LogStore: """An append-only, content-addressed operation log.""" def __init__( self, path: Optional[Union[str, Path]] = None, verify_on_load: bool = True, ) -> None: self._path: Optional[Path] = Path(path) if path is not None else None self._ops: Dict[str, Operation] = {} self._lock = threading.RLock() if self._path is not None and self._path.exists(): self._load(verify_on_load) # ------------------------------------------------------------------ # persistence # ------------------------------------------------------------------ def _load(self, verify: bool) -> None: assert self._path is not None with open(self._path, "r", encoding="utf-8") as fh: for lineno, line in enumerate(fh, start=1): line = line.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError as exc: raise StoreError( f"{self._path}:{lineno}: corrupt log line: {exc}" ) from exc op = op_from_dict(data) if verify and not verify_op(op): raise StoreError( f"{self._path}:{lineno}: operation {op.op_id} failed " "signature/identity verification" ) self._ops[op.op_id] = op def _persist(self, op: Operation) -> None: if self._path is None: return self._path.parent.mkdir(parents=True, exist_ok=True) line = json.dumps( op.to_dict(), sort_keys=True, separators=(",", ":"), ensure_ascii=False ) with open(self._path, "a", encoding="utf-8") as fh: fh.write(line + "\n") fh.flush() os.fsync(fh.fileno()) # ------------------------------------------------------------------ # mutation # ------------------------------------------------------------------ def append(self, op: Operation, verify: bool = True) -> bool: """Append ``op``. Returns False if it was already present. Raises :class:`StoreError` if verification is requested and fails. """ with self._lock: if op.op_id in self._ops: return False if verify and not verify_op(op): raise StoreError( f"refusing to append {op.op_id}: signature/identity " "verification failed" ) self._ops[op.op_id] = op self._persist(op) return True # ------------------------------------------------------------------ # queries # ------------------------------------------------------------------ def has(self, op_id: str) -> bool: with self._lock: return op_id in self._ops def get(self, op_id: str) -> Operation: with self._lock: try: return self._ops[op_id] except KeyError: raise StoreError(f"unknown operation: {op_id!r}") from None def all_ops(self) -> List[Operation]: """All operations in deterministic (lamport, op_id) order. Because an operation's lamport clock strictly exceeds that of every operation it references in ``prev``, this order is compatible with causal order and is identical on every node holding the same set. """ with self._lock: return sorted(self._ops.values(), key=lambda o: (o.lamport, o.op_id)) def op_ids(self) -> Set[str]: with self._lock: return set(self._ops.keys()) def ops_since(self, known: Iterable[str]) -> List[Operation]: """Operations whose ids are not in ``known``, deterministically ordered.""" known_set = set(known) return [op for op in self.all_ops() if op.op_id not in known_set] def frontier(self) -> List[str]: """Ids of head operations (not referenced by any ``prev`` we hold).""" with self._lock: referenced: Set[str] = set() for op in self._ops.values(): referenced.update(op.prev) return sorted(set(self._ops.keys()) - referenced) def max_lamport(self) -> int: with self._lock: if not self._ops: return 0 return max(op.lamport for op in self._ops.values()) def by_type(self, op_type: str) -> List[Operation]: return [op for op in self.all_ops() if op.op_type == op_type] def missing_dependencies(self, op: Operation) -> List[str]: with self._lock: return [p for p in op.prev if p not in self._ops] def __len__(self) -> int: with self._lock: return len(self._ops) def __contains__(self, op_id: str) -> bool: return self.has(op_id)