"""Operation construction, signing, and verification. An operation is the unit of the PMP log. Wire shape (milestone #2):: { "v": 1, # protocol version "type": "evidence.add", # dotted operation type "author": "ed25519:...", # key id of the signing identity "ts": "2025-06-01T12:00:00.000Z", "prev": "sha256:..." | null, # op_id of the previous op in this log "seq": 17, # 0-based position in this log "body": { ... }, # type-specific payload "sig": "", "op_id": "sha256:..." # content address of the signed op } Signing and identity rules: * ``sig`` is an Ed25519 signature over the canonical JSON bytes of the operation **without** the ``sig`` and ``op_id`` fields. * ``op_id`` is ``sha256:`` over the canonical JSON bytes of the operation **without** the ``op_id`` field (i.e. it covers the signature). Two byte-identical signed operations therefore share one op_id, and an op_id pins both the content and the exact signature. Evidence bodies (``evidence.add``) carry full provenance: which adapter produced them, what version, from which origin, when, plus a ``content_hash`` over the evidence content alone so deduplication and disclosure-without-content (later milestones) work mechanically. """ from __future__ import annotations from datetime import datetime, timezone from typing import Any, Iterable, Optional from pmp.canonical import canonical_hash, canonical_json, is_hash_ref from pmp.errors import OperationError, SignatureError from pmp.keys import KEY_ID_PREFIX, KeyPair, b64u_decode, b64u_encode, verify_signature PROTOCOL_VERSION = 1 # Operation types used by this milestone. OP_TYPE_NODE_HELLO = "node.hello" OP_TYPE_EVIDENCE_ADD = "evidence.add" # Reserved for later milestones (derivation engine, capabilities); listed # here so the log layer accepts them when those milestones land and so # other implementations know the namespace. OP_TYPE_CLAIM_ADD = "claim.add" OP_TYPE_CLAIM_RETRACT = "claim.retract" OP_TYPE_CLAIM_REFUTE = "claim.refute" OP_TYPE_CAPABILITY_GRANT = "capability.grant" OP_TYPE_CAPABILITY_REVOKE = "capability.revoke" KNOWN_OP_TYPES = frozenset( { OP_TYPE_NODE_HELLO, OP_TYPE_EVIDENCE_ADD, OP_TYPE_CLAIM_ADD, OP_TYPE_CLAIM_RETRACT, OP_TYPE_CLAIM_REFUTE, OP_TYPE_CAPABILITY_GRANT, OP_TYPE_CAPABILITY_REVOKE, } ) EVIDENCE_KIND = "evidence" _UNSIGNED_FIELDS = ("v", "type", "author", "ts", "prev", "seq", "body") _SIGNED_FIELDS = _UNSIGNED_FIELDS + ("sig", "op_id") def now_iso() -> str: """Current UTC time, millisecond precision, ``Z`` suffix.""" return ( datetime.now(timezone.utc) .isoformat(timespec="milliseconds") .replace("+00:00", "Z") ) # --------------------------------------------------------------------------- # construction # --------------------------------------------------------------------------- def make_op( op_type: str, author: str, body: dict, *, prev: Optional[str], seq: int, ts: Optional[str] = None, ) -> dict: """Build an unsigned operation. Validates structural invariants.""" if not isinstance(op_type, str) or "." not in op_type: raise OperationError(f"invalid op type: {op_type!r}") if not isinstance(author, str) or not author.startswith(KEY_ID_PREFIX): raise OperationError(f"author must be an ed25519 key id, got {author!r}") if not isinstance(body, dict): raise OperationError("body must be an object") if not isinstance(seq, int) or isinstance(seq, bool) or seq < 0: raise OperationError(f"seq must be a non-negative integer, got {seq!r}") if prev is None: if seq != 0: raise OperationError("genesis op (prev=null) must have seq 0") else: if not is_hash_ref(prev): raise OperationError(f"prev must be a sha256 ref or null, got {prev!r}") if seq == 0: raise OperationError("seq 0 is reserved for the genesis op (prev=null)") return { "v": PROTOCOL_VERSION, "type": op_type, "author": author, "ts": ts or now_iso(), "prev": prev, "seq": seq, "body": body, } def signing_payload(op: dict) -> bytes: """Canonical bytes the signature covers (op without ``sig``/``op_id``).""" unsigned = {k: op[k] for k in _UNSIGNED_FIELDS if k in op} return canonical_json(unsigned) def compute_op_id(op: dict) -> str: """Content address of a signed op (covers everything except ``op_id``).""" addressed = {k: v for k, v in op.items() if k != "op_id"} return canonical_hash(addressed) def sign_op(op: dict, keypair: KeyPair) -> dict: """Sign an unsigned op with *keypair*; returns a new dict with sig + op_id.""" if op.get("author") != keypair.key_id: raise OperationError( "op author does not match signing key " f"({op.get('author')!r} != {keypair.key_id!r})" ) signature = keypair.sign(signing_payload(op)) signed = dict(op) signed["sig"] = b64u_encode(signature) signed["op_id"] = compute_op_id(signed) return signed # --------------------------------------------------------------------------- # verification # --------------------------------------------------------------------------- def verify_op(op: dict) -> None: """Verify one signed op in isolation: structure, op_id, signature, body. Raises :class:`OperationError` or :class:`SignatureError`; returns ``None`` on success. Chain placement (prev/seq) is the log's job. """ if not isinstance(op, dict): raise OperationError("operation must be an object") missing = [k for k in _SIGNED_FIELDS if k not in op] if missing: raise OperationError(f"operation missing fields: {missing}") extra = [k for k in op if k not in _SIGNED_FIELDS] if extra: raise OperationError(f"operation has unknown top-level fields: {extra}") if op["v"] != PROTOCOL_VERSION: raise OperationError(f"unsupported protocol version {op['v']!r}") if not isinstance(op["type"], str) or "." not in op["type"]: raise OperationError(f"invalid op type: {op['type']!r}") if not isinstance(op["author"], str) or not op["author"].startswith(KEY_ID_PREFIX): raise OperationError(f"invalid author: {op['author']!r}") if not isinstance(op["ts"], str) or not op["ts"]: raise OperationError("ts must be a non-empty string") if op["prev"] is not None and not is_hash_ref(op["prev"]): raise OperationError(f"invalid prev: {op['prev']!r}") if not isinstance(op["seq"], int) or isinstance(op["seq"], bool) or op["seq"] < 0: raise OperationError(f"invalid seq: {op['seq']!r}") if (op["prev"] is None) != (op["seq"] == 0): raise OperationError("prev=null iff seq=0 violated") if not isinstance(op["body"], dict): raise OperationError("body must be an object") expected_id = compute_op_id(op) if op["op_id"] != expected_id: raise OperationError( f"op_id mismatch: claimed {op['op_id']}, computed {expected_id}" ) try: signature = b64u_decode(op["sig"]) except Exception as exc: # noqa: BLE001 raise SignatureError(f"malformed signature encoding: {exc}") from exc if not verify_signature(op["author"], signing_payload(op), signature): raise SignatureError(f"invalid signature on op {op['op_id']}") if op["type"] == OP_TYPE_EVIDENCE_ADD: validate_evidence_body(op["body"]) # --------------------------------------------------------------------------- # evidence bodies # --------------------------------------------------------------------------- def make_evidence_body( *, adapter: str, adapter_version: str, origin: str, content_type: str, content: dict, external_id: Optional[str] = None, observed_at: Optional[str] = None, supersedes: Optional[Iterable[str]] = None, imported_at: Optional[str] = None, ) -> dict: """Build a provenance-tagged evidence body. * ``content`` is the normalized record the adapter extracted. * ``content_hash`` commits to the content alone, so a later milestone can disclose *that* evidence exists (and what it derived) without disclosing the content itself. * ``external_id`` is the source's own stable identifier (ICS UID, note path, photo unique id); it drives supersession when a source record changes. * ``supersedes`` lists op_ids of earlier evidence ops this one replaces (same external_id, changed content). """ if not isinstance(content, dict): raise OperationError("evidence content must be an object") body: dict[str, Any] = { "kind": EVIDENCE_KIND, "content_type": content_type, "content": content, "content_hash": canonical_hash(content), "source": { "adapter": adapter, "adapter_version": adapter_version, "origin": origin, "imported_at": imported_at or now_iso(), }, } if external_id is not None: body["external_id"] = external_id if observed_at is not None: body["observed_at"] = observed_at if supersedes: body["supersedes"] = list(supersedes) return body def validate_evidence_body(body: dict) -> None: """Structural validation of an ``evidence.add`` body.""" if body.get("kind") != EVIDENCE_KIND: raise OperationError(f"evidence body kind must be {EVIDENCE_KIND!r}") if not isinstance(body.get("content_type"), str) or not body["content_type"]: raise OperationError("evidence body missing content_type") if not isinstance(body.get("content"), dict): raise OperationError("evidence content must be an object") if body.get("content_hash") != canonical_hash(body["content"]): raise OperationError("evidence content_hash does not match content") source = body.get("source") if not isinstance(source, dict): raise OperationError("evidence body missing source provenance") for field in ("adapter", "adapter_version", "origin", "imported_at"): if not isinstance(source.get(field), str) or not source[field]: raise OperationError(f"evidence source missing field {field!r}") if "external_id" in body and ( not isinstance(body["external_id"], str) or not body["external_id"] ): raise OperationError("evidence external_id must be a non-empty string") if "observed_at" in body and not isinstance(body["observed_at"], str): raise OperationError("evidence observed_at must be a string") if "supersedes" in body: refs = body["supersedes"] if ( not isinstance(refs, list) or not refs or not all(is_hash_ref(r) for r in refs) ): raise OperationError("supersedes must be a non-empty list of op refs")