"""Append-only operation log: JSONL of signed envelopes, plus full audit. The log file is the node's single source of truth. ``append`` writes one canonical-JSON line per operation and fsyncs. ``audit`` re-reads the file from disk and independently re-verifies everything a foreign implementation would: JSON well-formedness, envelope structure, body schemas, op_id recomputation, Ed25519 signatures, per-author hash chains, and duplicate detection. The audit never trusts in-memory state, so it also detects on-disk tampering after the fact. """ from __future__ import annotations import json import os from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple, Union from .errors import ChainError, SignatureError, WireError from .identity import verify_signature from .wire import ( canonical_bytes, signing_bytes, validate_envelope_structure, ) @dataclass class AuditIssue: line_no: int op_id: Optional[str] kind: str # "json" | "structure" | "signature" | "chain" | "duplicate" detail: str @dataclass class AuditReport: path: str total_lines: int = 0 verified_ops: int = 0 issues: List[AuditIssue] = field(default_factory=list) by_type: Dict[str, int] = field(default_factory=dict) by_author: Dict[str, int] = field(default_factory=dict) first_ts: Optional[str] = None last_ts: Optional[str] = None @property def ok(self) -> bool: return not self.issues def to_dict(self) -> Dict[str, Any]: return { "path": self.path, "total_lines": self.total_lines, "verified_ops": self.verified_ops, "ok": self.ok, "issues": [ { "line": i.line_no, "op_id": i.op_id, "kind": i.kind, "detail": i.detail, } for i in self.issues ], "by_type": dict(self.by_type), "by_author": dict(self.by_author), "first_ts": self.first_ts, "last_ts": self.last_ts, } class OpLog: """The append-only signed operation log backing a node.""" def __init__(self, path: Union[str, Path]): self.path = Path(path) def exists(self) -> bool: return self.path.exists() # -- writing ------------------------------------------------------------- def append(self, envelope: Dict[str, Any]) -> None: """Append a fully built envelope. Validates structure first so a bad envelope can never reach disk through this code path.""" validate_envelope_structure(envelope) line = canonical_bytes(envelope) + b"\n" self.path.parent.mkdir(parents=True, exist_ok=True) with open(self.path, "ab") as fh: fh.write(line) fh.flush() os.fsync(fh.fileno()) # -- reading ------------------------------------------------------------- def read_all(self) -> List[Dict[str, Any]]: """Read every envelope, validating structure. Raises on any defect; use :meth:`audit` for forgiving, line-by-line diagnostics.""" ops = [] for line_no, raw in self._raw_lines(): try: env = json.loads(raw) except ValueError as exc: raise WireError(f"{self.path}:{line_no}: invalid JSON: {exc}") from exc try: validate_envelope_structure(env) except WireError as exc: raise WireError(f"{self.path}:{line_no}: {exc}") from exc ops.append(env) return ops def __iter__(self) -> Iterator[Dict[str, Any]]: return iter(self.read_all()) def _raw_lines(self) -> Iterator[Tuple[int, str]]: if not self.path.exists(): return with open(self.path, "r", encoding="utf-8") as fh: for line_no, line in enumerate(fh, start=1): stripped = line.strip() if stripped: yield line_no, stripped # -- audit ---------------------------------------------------------------- def audit(self) -> AuditReport: """Independently re-verify the entire log from disk.""" report = AuditReport(path=str(self.path)) heads: Dict[str, Optional[str]] = {} # author -> last verified op_id seen_ids: Dict[str, int] = {} # op_id -> first line for line_no, raw in self._raw_lines(): report.total_lines += 1 try: env = json.loads(raw) except ValueError as exc: report.issues.append( AuditIssue(line_no, None, "json", f"invalid JSON: {exc}") ) continue op_id = env.get("op_id") if isinstance(env, dict) else None try: validate_envelope_structure(env) except WireError as exc: report.issues.append(AuditIssue(line_no, op_id, "structure", str(exc))) continue try: verify_signature(env["author"], signing_bytes(env), env["sig"]) except SignatureError as exc: report.issues.append(AuditIssue(line_no, op_id, "signature", str(exc))) continue if env["op_id"] in seen_ids: report.issues.append( AuditIssue( line_no, env["op_id"], "duplicate", f"op already appeared at line {seen_ids[env['op_id']]}", ) ) continue seen_ids[env["op_id"]] = line_no author = env["author"] expected_prev = heads.get(author) if env["prev"] != expected_prev: report.issues.append( AuditIssue( line_no, env["op_id"], "chain", f"author chain broken: prev is {env['prev']!r} but the " f"last verified op for {author[:24]}… is " f"{expected_prev!r}", ) ) # Continue verifying from this op so a single break does not # cascade into noise; the break itself is already recorded. heads[author] = env["op_id"] report.verified_ops += 1 report.by_type[env["type"]] = report.by_type.get(env["type"], 0) + 1 report.by_author[author] = report.by_author.get(author, 0) + 1 if report.first_ts is None: report.first_ts = env["ts"] report.last_ts = env["ts"] return report def verify_or_raise(self) -> None: report = self.audit() if not report.ok: first = report.issues[0] raise ChainError( f"operation log failed verification with {len(report.issues)} " f"issue(s); first: line {first.line_no} [{first.kind}] " f"{first.detail}" )