"""Signed operations: the only way state enters Mnema. Every event — raw evidence, a derived claim, a user correction or refutation, a mechanical cascade invalidation, an explanation record — is a signed, content-addressed, append-only operation. This module defines the envelope, the operation types used by the derivation layer, and strict body validation for each type. Envelope (wire form) -------------------- :: { "v": 1, # protocol version "type": "claim.assert", # operation type "author": "mnk_", # signing identity "ts": 1718000000000, # unix milliseconds "prev": ["op_..."], # causal references (log heads) "body": { ... }, # type-specific payload "sig": "", # over MNC-1({v,type,author,ts,prev,body}) "op_id": "op_" # BLAKE2b-256 over MNC-1(envelope minus op_id) } Operation types (milestone 4 subset) ------------------------------------ ``evidence.add`` Raw or adapter-normalized evidence (calendar event, note, photo metadata, message, contact). Evidence is never interpreted as true about the user; only claims are. ``claim.assert`` A derived (or user-asserted) claim with claim key, value, confidence (ppm), deriver identity/version, and the exact input operation ids it was derived from — the provenance edge set. ``claim.refute`` User says a claim is wrong. ``scope="value"`` suppresses re-assertion of that specific value; ``scope="key"`` suppresses the whole claim key (the system must stop believing anything under it). ``claim.correct`` User replaces a claim's value. The corrected value becomes an active user-asserted claim under the same claim key. ``claim.invalidate`` Mechanically emitted by the engine for each downstream claim whose derivation inputs were refuted/corrected; carries the causing operation and the dependency path for auditability. ``explanation.record`` A persisted, signed answer to "why does the system believe this?": a human-readable summary plus the structured provenance chain. """ from __future__ import annotations import copy from dataclasses import dataclass, field from typing import Any, Iterable, Mapping, Optional, Tuple from mnema.core.canonical import canonical_bytes from mnema.core.ids import is_author_id, is_claim_key, is_op_id, op_id_for_wire from mnema.core.keys import Identity, verify_signature_strict __all__ = [ "PROTOCOL_VERSION", "EVIDENCE_ADD", "CLAIM_ASSERT", "CLAIM_REFUTE", "CLAIM_CORRECT", "CLAIM_INVALIDATE", "EXPLANATION_RECORD", "ALL_OP_TYPES", "REFUTE_SCOPE_VALUE", "REFUTE_SCOPE_KEY", "REFUTE_SCOPES", "CONFIDENCE_PPM_MAX", "Operation", "OperationError", "SchemaError", "SignatureError", "validate_body", ] PROTOCOL_VERSION = 1 EVIDENCE_ADD = "evidence.add" CLAIM_ASSERT = "claim.assert" CLAIM_REFUTE = "claim.refute" CLAIM_CORRECT = "claim.correct" CLAIM_INVALIDATE = "claim.invalidate" EXPLANATION_RECORD = "explanation.record" ALL_OP_TYPES = frozenset( { EVIDENCE_ADD, CLAIM_ASSERT, CLAIM_REFUTE, CLAIM_CORRECT, CLAIM_INVALIDATE, EXPLANATION_RECORD, } ) REFUTE_SCOPE_VALUE = "value" REFUTE_SCOPE_KEY = "key" REFUTE_SCOPES = frozenset({REFUTE_SCOPE_VALUE, REFUTE_SCOPE_KEY}) CONFIDENCE_PPM_MAX = 1_000_000 class OperationError(ValueError): """Base class for operation problems.""" class SchemaError(OperationError): """The operation body or envelope violates the schema.""" class SignatureError(OperationError): """The operation signature or content address does not verify.""" # --------------------------------------------------------------------------- # Body validation helpers # --------------------------------------------------------------------------- def _expect(cond: bool, msg: str) -> None: if not cond: raise SchemaError(msg) def _str_field(body: Mapping[str, Any], name: str, *, optional: bool = False, allow_empty: bool = False) -> Optional[str]: if name not in body: _expect(optional, f"body.{name} is required") return None val = body[name] _expect(isinstance(val, str), f"body.{name} must be a string") if not allow_empty: _expect(len(val) > 0, f"body.{name} must be non-empty") return val def _int_field(body: Mapping[str, Any], name: str, *, optional: bool = False, minimum: Optional[int] = None, maximum: Optional[int] = None) -> Optional[int]: if name not in body: _expect(optional, f"body.{name} is required") return None val = body[name] _expect(isinstance(val, int) and not isinstance(val, bool), f"body.{name} must be an integer") if minimum is not None: _expect(val >= minimum, f"body.{name} must be >= {minimum}") if maximum is not None: _expect(val <= maximum, f"body.{name} must be <= {maximum}") return val def _dict_field(body: Mapping[str, Any], name: str, *, optional: bool = False) -> Optional[Mapping]: if name not in body: _expect(optional, f"body.{name} is required") return None val = body[name] _expect(isinstance(val, dict), f"body.{name} must be an object") return val def _op_id_field(body: Mapping[str, Any], name: str, *, optional: bool = False) -> Optional[str]: if name not in body: _expect(optional, f"body.{name} is required") return None val = body[name] _expect(is_op_id(val), f"body.{name} must be a well-formed op id (op_<64 hex>)") return val def _op_id_list_field(body: Mapping[str, Any], name: str, *, optional: bool = False, allow_empty: bool = True) -> Optional[list]: if name not in body: _expect(optional, f"body.{name} is required") return None val = body[name] _expect(isinstance(val, list), f"body.{name} must be a list") if not allow_empty: _expect(len(val) > 0, f"body.{name} must be non-empty") seen = set() for i, item in enumerate(val): _expect(is_op_id(item), f"body.{name}[{i}] must be a well-formed op id") _expect(item not in seen, f"body.{name} contains duplicate op id {item}") seen.add(item) return val def _claim_key_field(body: Mapping[str, Any], name: str = "claim_key") -> str: val = body.get(name) _expect(is_claim_key(val), f"body.{name} must be a well-formed claim key (ck_<64 hex>)") return val # type: ignore[return-value] # --------------------------------------------------------------------------- # Per-type validators # --------------------------------------------------------------------------- def _validate_evidence_add(body: Mapping[str, Any]) -> None: _str_field(body, "source") # e.g. "adapter.calendar" _str_field(body, "kind") # e.g. "calendar.event" _dict_field(body, "content") _int_field(body, "observed_at", minimum=0) _str_field(body, "external_id", optional=True) def _validate_claim_assert(body: Mapping[str, Any]) -> None: _claim_key_field(body) _str_field(body, "subject") _str_field(body, "predicate") _dict_field(body, "value") _int_field(body, "confidence_ppm", minimum=0, maximum=CONFIDENCE_PPM_MAX) _str_field(body, "deriver") _str_field(body, "deriver_version") _op_id_list_field(body, "inputs", allow_empty=False) _str_field(body, "rationale") _dict_field(body, "params", optional=True) _op_id_field(body, "supersedes", optional=True) def _validate_claim_refute(body: Mapping[str, Any]) -> None: _op_id_field(body, "target") _claim_key_field(body) _str_field(body, "reason") scope = _str_field(body, "scope") _expect(scope in REFUTE_SCOPES, f"body.scope must be one of {sorted(REFUTE_SCOPES)}") def _validate_claim_correct(body: Mapping[str, Any]) -> None: _op_id_field(body, "target") _claim_key_field(body) _dict_field(body, "corrected_value") _str_field(body, "reason") def _validate_claim_invalidate(body: Mapping[str, Any]) -> None: _op_id_field(body, "target") _op_id_field(body, "cause") _op_id_list_field(body, "path") # dependency path from cause to target; may be empty def _validate_explanation_record(body: Mapping[str, Any]) -> None: _op_id_field(body, "claim") _claim_key_field(body) _str_field(body, "summary") chain = body.get("chain") _expect(isinstance(chain, list), "body.chain must be a list") for i, step in enumerate(chain): _expect(isinstance(step, dict), f"body.chain[{i}] must be an object") _expect(isinstance(step.get("kind"), str) and step["kind"], f"body.chain[{i}].kind must be a non-empty string") _str_field(body, "generated_by") _VALIDATORS = { EVIDENCE_ADD: _validate_evidence_add, CLAIM_ASSERT: _validate_claim_assert, CLAIM_REFUTE: _validate_claim_refute, CLAIM_CORRECT: _validate_claim_correct, CLAIM_INVALIDATE: _validate_claim_invalidate, EXPLANATION_RECORD: _validate_explanation_record, } def validate_body(op_type: str, body: Mapping[str, Any]) -> None: """Validate *body* against the schema for *op_type*. Unknown top-level fields are permitted (forward compatibility); required fields are checked strictly. """ _expect(op_type in ALL_OP_TYPES, f"unknown operation type {op_type!r}") _expect(isinstance(body, dict), "body must be an object") _VALIDATORS[op_type](body) # --------------------------------------------------------------------------- # Operation envelope # --------------------------------------------------------------------------- @dataclass(frozen=True) class Operation: """An immutable, signed, content-addressed operation.""" v: int type: str author: str ts: int prev: Tuple[str, ...] body: Mapping[str, Any] = field(repr=False) sig: str = field(repr=False) op_id: str = "" # -- construction ------------------------------------------------------- @staticmethod def signing_payload(*, v: int, type: str, author: str, ts: int, prev: Iterable[str], body: Mapping[str, Any]) -> dict: return { "v": v, "type": type, "author": author, "ts": ts, "prev": list(prev), "body": dict(body), } @classmethod def build(cls, *, type: str, body: Mapping[str, Any], identity: Identity, ts: int, prev: Iterable[str] = ()) -> "Operation": """Validate, sign, and content-address a new operation.""" validate_body(type, body) if not isinstance(ts, int) or isinstance(ts, bool) or ts < 0: raise SchemaError("ts must be a non-negative integer (unix milliseconds)") prev_tuple = tuple(prev) for p in prev_tuple: if not is_op_id(p): raise SchemaError(f"prev entry {p!r} is not a well-formed op id") body_copy = copy.deepcopy(dict(body)) payload = cls.signing_payload( v=PROTOCOL_VERSION, type=type, author=identity.author_id, ts=ts, prev=prev_tuple, body=body_copy, ) sig = identity.sign(canonical_bytes(payload)) op_id = op_id_for_wire({**payload, "sig": sig}) return cls( v=PROTOCOL_VERSION, type=type, author=identity.author_id, ts=ts, prev=prev_tuple, body=body_copy, sig=sig, op_id=op_id, ) # -- wire conversion ---------------------------------------------------- def to_wire(self) -> dict: return { "v": self.v, "type": self.type, "author": self.author, "ts": self.ts, "prev": list(self.prev), "body": copy.deepcopy(dict(self.body)), "sig": self.sig, "op_id": self.op_id, } @classmethod def from_wire(cls, wire: Mapping[str, Any], *, verify: bool = True) -> "Operation": if not isinstance(wire, dict): raise SchemaError("wire envelope must be an object") missing = {"v", "type", "author", "ts", "prev", "body", "sig", "op_id"} - set(wire) if missing: raise SchemaError(f"wire envelope missing fields: {sorted(missing)}") if wire["v"] != PROTOCOL_VERSION: raise SchemaError(f"unsupported protocol version {wire['v']!r}") if wire["type"] not in ALL_OP_TYPES: raise SchemaError(f"unknown operation type {wire['type']!r}") if not is_author_id(wire["author"]): raise SchemaError(f"malformed author id {wire['author']!r}") ts = wire["ts"] if not isinstance(ts, int) or isinstance(ts, bool) or ts < 0: raise SchemaError("ts must be a non-negative integer") prev = wire["prev"] if not isinstance(prev, list) or not all(is_op_id(p) for p in prev): raise SchemaError("prev must be a list of well-formed op ids") validate_body(wire["type"], wire["body"]) if not isinstance(wire["sig"], str): raise SchemaError("sig must be a hex string") if not is_op_id(wire["op_id"]): raise SchemaError("op_id is malformed") op = cls( v=wire["v"], type=wire["type"], author=wire["author"], ts=ts, prev=tuple(prev), body=copy.deepcopy(dict(wire["body"])), sig=wire["sig"], op_id=wire["op_id"], ) if verify: op.verify() return op # -- verification ------------------------------------------------------- def verify(self) -> None: """Verify the signature and the content address; raises on failure.""" payload = self.signing_payload( v=self.v, type=self.type, author=self.author, ts=self.ts, prev=self.prev, body=dict(self.body), ) expected_id = op_id_for_wire({**payload, "sig": self.sig}) if expected_id != self.op_id: raise SignatureError( f"op_id mismatch: claimed {self.op_id}, computed {expected_id}" ) try: verify_signature_strict(self.author, canonical_bytes(payload), self.sig) except Exception as exc: # BadSignatureError, KeyError_ raise SignatureError(f"signature verification failed: {exc}") from exc