"""Append-only operation log with hash chaining and deterministic merge. Operations form a Merkle DAG via their ``prev`` arrays. A log accepts an operation only if it verifies (see ``fpcf.verify``) against the operations already present, and rejects duplicates (``FP-E-DUP``). Deterministic total order (normative for replay and export): topological order over the ``prev`` DAG, breaking ties by ``(ts, op_id)`` ascending. Merge of two logs is the verified union of their operations replayed in that order; merge is commutative, associative, and idempotent because op ids are content hashes. See spec/02-wire-format/03-log-and-merge.md. """ import heapq from typing import Dict, Iterator, List, Tuple from .canonical import canonicalize from .errors import FpcfError, E_DUP, E_PREV from .verify import verify_op, ts_key class OpLog: """An in-memory verified operation log.""" def __init__(self): self._ops: Dict[str, dict] = {} # -- append ---------------------------------------------------------- def append_raw(self, raw: bytes) -> str: """Verify ``raw`` against this log and append it. Returns the op id.""" oid, env = verify_op(raw, self._ops) if oid in self._ops: raise FpcfError(E_DUP, "operation %s already in log" % oid) self._ops[oid] = env return oid def append_env(self, envelope: dict) -> str: """Canonicalize an envelope dict and append it.""" return self.append_raw(canonicalize(envelope)) # -- access ---------------------------------------------------------- def __contains__(self, op_id: str) -> bool: return op_id in self._ops def __len__(self) -> int: return len(self._ops) def get(self, op_id: str) -> dict: return self._ops[op_id] def op_ids(self) -> List[str]: return sorted(self._ops) def raw(self, op_id: str) -> bytes: """Canonical wire bytes for a stored operation.""" return canonicalize(self._ops[op_id]) # -- structure ------------------------------------------------------- def heads(self) -> List[str]: """Op ids not referenced by any stored op's ``prev`` (sorted).""" referenced = set() for env in self._ops.values(): referenced.update(env["prev"]) return sorted(oid for oid in self._ops if oid not in referenced) def topo_order(self) -> List[str]: """Deterministic total order: topological, tie-broken by (ts, op_id).""" children: Dict[str, List[str]] = {} indegree: Dict[str, int] = {} for oid, env in self._ops.items(): indegree[oid] = len(env["prev"]) for p in env["prev"]: children.setdefault(p, []).append(oid) ready = [ (ts_key(self._ops[oid]["ts"]), oid) for oid, deg in indegree.items() if deg == 0 ] heapq.heapify(ready) order: List[str] = [] while ready: _, oid = heapq.heappop(ready) order.append(oid) for child in children.get(oid, ()): indegree[child] -= 1 if indegree[child] == 0: heapq.heappush(ready, (ts_key(self._ops[child]["ts"]), child)) if len(order) != len(self._ops): # unreachable with content-hash ids raise FpcfError(E_PREV, "log contains a dependency cycle") return order def envelopes(self) -> Iterator[Tuple[str, dict]]: """Iterate ``(op_id, envelope)`` in the deterministic total order.""" for oid in self.topo_order(): yield oid, self._ops[oid] def export_raw(self) -> List[bytes]: """Canonical wire bytes of every op, in the deterministic order.""" return [canonicalize(self._ops[oid]) for oid in self.topo_order()] # -- merge ----------------------------------------------------------- def merge(self, other: "OpLog") -> "OpLog": """Verified union of two logs as a new log. Every operation is re-verified during replay; a log that references operations absent from both inputs cannot be merged (``FP-E-PREV``). """ remaining: Dict[str, dict] = {} for src in (self, other): for oid in src.op_ids(): remaining.setdefault(oid, src.get(oid)) merged = OpLog() while remaining: ready = [ oid for oid, env in remaining.items() if all(p in merged for p in env["prev"]) ] if not ready: raise FpcfError(E_PREV, "merge: unresolved prev references") for oid in sorted(ready, key=lambda o: (ts_key(remaining[o]["ts"]), o)): merged.append_env(remaining.pop(oid)) return merged def verify_all(self) -> bool: """Re-verify the entire log from scratch in deterministic order.""" fresh = OpLog() for raw in self.export_raw(): fresh.append_raw(raw) return len(fresh) == len(self)