"""Operation sinks: how the derivation engine emits state changes. The engine never writes the signed operation log directly. Every state change (claim asserted, claim invalidated, evidence retracted, ...) is handed to an :class:`OpSink`. This keeps the engine testable (use :class:`CollectingSink`), keeps replay side-effect free (use :class:`NullSink`), and keeps the real node honest: the only durable state is the signed log (use :class:`LogOpSink`). Op kinds emitted by the engine ------------------------------ ``evidence.observe`` payload: serialized Evidence record ``evidence.retract`` payload: {evidence_id, reason} ``claim.assert`` payload: {claim, identity, supersedes} ``claim.supersede`` payload: {claim_id, superseded_by} ``claim.invalidate`` payload: {claim_id, cascade_root, cause} ``claim.refute`` payload: {claim_id, identity, reason} ``claim.correct`` payload: {claim_id, identity, reason, new_claim} ``explain.record`` payload: {claim_id, explanation} """ from __future__ import annotations from typing import Any, Dict, List, Mapping, Optional, Protocol, Tuple, runtime_checkable @runtime_checkable class OpSink(Protocol): """Anything that can accept an operation emitted by the engine.""" def emit(self, kind: str, payload: Mapping[str, Any]) -> Optional[str]: """Persist (or record) one operation; return its op id if known.""" ... class NullSink: """Discards everything. Used internally during log replay so that re-applying historical operations never re-emits them.""" def emit(self, kind: str, payload: Mapping[str, Any]) -> Optional[str]: return None class CollectingSink: """Records emitted operations in memory. Used by tests and by dry-run tooling that wants to preview what a derivation pass would write before committing it to the log.""" def __init__(self) -> None: self.ops: List[Tuple[str, Dict[str, Any]]] = [] def emit(self, kind: str, payload: Mapping[str, Any]) -> Optional[str]: self.ops.append((kind, dict(payload))) return "memop_%06d" % len(self.ops) def kinds(self) -> List[str]: return [kind for kind, _ in self.ops] def of_kind(self, kind: str) -> List[Dict[str, Any]]: return [payload for k, payload in self.ops if k == kind] def clear(self) -> None: self.ops.clear() class LogOpSink: """Signs each emitted operation with the node key and appends it to the node's append-only operation log.""" def __init__(self, log: Any, keypair: Any) -> None: self._log = log self._keypair = keypair def emit(self, kind: str, payload: Mapping[str, Any]) -> Optional[str]: # Local import keeps this module importable in pure-memory test # configurations that never touch the core log machinery. from mnema.core.operations import Operation op = Operation.create( kind=kind, payload=dict(payload), keypair=self._keypair, prev=self._log.head_id(), ) self._log.append(op) return op.op_id class TeeSink: """Fans one emission out to several sinks (e.g. log + in-memory collector for a CLI that wants to print what it just wrote).""" def __init__(self, *sinks: OpSink) -> None: self._sinks = list(sinks) def emit(self, kind: str, payload: Mapping[str, Any]) -> Optional[str]: op_id: Optional[str] = None for sink in self._sinks: result = sink.emit(kind, payload) if op_id is None: op_id = result return op_id