"""Append-only, signed, hash-chained operation log with local storage. Storage layout (one directory per log):: /ops.jsonl # one canonical-JSON operation per line Invariants enforced on every append *and* re-checked on every open: * each op verifies in isolation (structure, op_id, signature); * op ``seq`` equals its 0-based line number; * op ``prev`` equals the ``op_id`` of the previous line (``null`` for line 0); * each on-disk line is byte-identical to the canonical JSON of the op it claims to be -- the file itself is canonical, so two honest nodes holding the same ops hold the same bytes. The reference log keeps all ops in memory after open. That is fine for a personal corpus of tens of thousands of operations and keeps the code auditable; an indexed store is an implementation detail other nodes are free to change without affecting the wire format. """ from __future__ import annotations import json import os from pathlib import Path from typing import Iterator, Optional from pmp.canonical import canonical_json from pmp.errors import LogIntegrityError, PMPError from pmp.operations import verify_op OPS_FILENAME = "ops.jsonl" class OpLog: def __init__(self, path: Path | str): self.path = Path(path) self.path.mkdir(parents=True, exist_ok=True) self.ops_file = self.path / OPS_FILENAME self._ops: list[dict] = [] self._by_id: dict[str, dict] = {} if self.ops_file.exists(): for op in self._read_and_verify_file(): self._ops.append(op) self._by_id[op["op_id"]] = op # -- file reading ------------------------------------------------------- def _read_and_verify_file(self) -> Iterator[dict]: """Yield ops from disk, verifying every invariant as we go.""" prev_id: Optional[str] = None with self.ops_file.open("rb") as fh: for lineno, raw_line in enumerate(fh): line = raw_line.rstrip(b"\n") if not line: raise LogIntegrityError( f"{self.ops_file}: blank line at {lineno}" ) try: op = json.loads(line.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: raise LogIntegrityError( f"{self.ops_file}: unparseable line {lineno}: {exc}" ) from exc try: verify_op(op) except PMPError as exc: raise LogIntegrityError( f"{self.ops_file}: invalid op at line {lineno}: {exc}" ) from exc if canonical_json(op) != line: raise LogIntegrityError( f"{self.ops_file}: line {lineno} is not in canonical form" ) if op["seq"] != lineno: raise LogIntegrityError( f"{self.ops_file}: line {lineno} has seq {op['seq']}" ) if op["prev"] != prev_id: raise LogIntegrityError( f"{self.ops_file}: line {lineno} prev {op['prev']!r} " f"does not match head {prev_id!r}" ) prev_id = op["op_id"] yield op # -- properties --------------------------------------------------------- @property def head_id(self) -> Optional[str]: return self._ops[-1]["op_id"] if self._ops else None @property def next_seq(self) -> int: return len(self._ops) def __len__(self) -> int: return len(self._ops) def __iter__(self) -> Iterator[dict]: return iter(self._ops) def iter_type(self, op_type: str) -> Iterator[dict]: return (op for op in self._ops if op["type"] == op_type) def get(self, op_id: str, default=None): return self._by_id.get(op_id, default) # -- mutation ----------------------------------------------------------- def append(self, op: dict) -> str: """Append one signed op. Verifies it and its chain placement. Returns the op_id. The write is flushed and fsynced before the in-memory state changes, so a crash never leaves memory ahead of disk. """ try: verify_op(op) except PMPError as exc: raise LogIntegrityError(f"refusing to append invalid op: {exc}") from exc if op["seq"] != self.next_seq: raise LogIntegrityError( f"op seq {op['seq']} does not match next seq {self.next_seq}" ) if op["prev"] != self.head_id: raise LogIntegrityError( f"op prev {op['prev']!r} does not match head {self.head_id!r}" ) if op["op_id"] in self._by_id: raise LogIntegrityError(f"duplicate op_id {op['op_id']}") line = canonical_json(op) + b"\n" with self.ops_file.open("ab") as fh: fh.write(line) fh.flush() os.fsync(fh.fileno()) self._ops.append(op) self._by_id[op["op_id"]] = op return op["op_id"] # -- audit -------------------------------------------------------------- def verify(self) -> dict: """Full re-verification of the on-disk log against memory. Re-reads the file, re-checks every signature, hash, sequence number, chain link, and canonical byte form, and confirms it matches the in-memory state. Returns a report dict; raises :class:`LogIntegrityError` on any violation. """ count = 0 head: Optional[str] = None if self.ops_file.exists(): for op in self._read_and_verify_file(): if count >= len(self._ops) or self._ops[count]["op_id"] != op["op_id"]: raise LogIntegrityError( f"in-memory log diverges from disk at seq {count}" ) head = op["op_id"] count += 1 if count != len(self._ops): raise LogIntegrityError( f"in-memory log has {len(self._ops)} ops but disk has {count}" ) return {"ok": True, "ops": count, "head": head, "file": str(self.ops_file)}