"""Signed operation envelopes for the FablePool append-only log (FP/1). Every change to a user's memory graph -- new evidence, a derived claim, a user correction, a capability grant, a revocation, a delegate's receipt -- is an *operation*: a signed, content-addressed, immutable JSON envelope. Envelope shape (all fields required):: { "v": 1, "type": "", "author": "ed25519:...", # key that signed this op "node": "", # human-meaningful origin label "ts": , "prev": ["sha256:...", ...], # DAG parents: the author's view of the # log heads when this op was created "body": { ... type-specific ... }, "sig": "", "op_id": "sha256:" } The ``prev`` links make the log a Merkle DAG: a node cannot silently drop or reorder history that another node has already observed, and sync reduces to exchanging heads and fetching missing ancestors. Operation types --------------- evidence Raw imported material (calendar event, note, photo metadata...). claim A statement about a subject, with provenance (``derived_from``) pointing at the evidence/claims it was derived from, a deriver identifier, a human rationale, and integer confidence in basis points. User assertions are claims with ``deriver == "user"`` and empty ``derived_from``. refutation The user (or an authorized key) rejects a specific op. Validity of everything derived from the refuted op cascades off. grant Embeds a capability document delegating a narrow, claim-only slice to an audience key. See :mod:`fablepool.capability`. revocation Mechanically revokes a previously granted capability. attestation A grantor-signed projection of a single valid claim, carrying a salted provenance commitment instead of evidence references. This is what a delegate receives -- never raw evidence. receipt A delegate-signed acknowledgement that a revocation was honored and the delegated attestations purged. This is how revocation becomes *verifiable* rather than merely requested. """ from __future__ import annotations from typing import Any, Dict, Iterable, List from .canonical import canonical_json, content_address, is_content_address from .keys import SignatureError, is_key_id, verify_signature PROTOCOL_VERSION = 1 OP_EVIDENCE = "evidence" OP_CLAIM = "claim" OP_REFUTATION = "refutation" OP_GRANT = "grant" OP_REVOCATION = "revocation" OP_ATTESTATION = "attestation" OP_RECEIPT = "receipt" OP_TYPES = ( OP_EVIDENCE, OP_CLAIM, OP_REFUTATION, OP_GRANT, OP_REVOCATION, OP_ATTESTATION, OP_RECEIPT, ) SIGNED_FIELDS = ("v", "type", "author", "node", "ts", "prev", "body") ENVELOPE_FIELDS = SIGNED_FIELDS + ("sig", "op_id") CONFIDENCE_MAX_BP = 10000 class OpError(ValueError): """Raised when an operation is structurally or cryptographically invalid.""" # --------------------------------------------------------------------------- # Construction # --------------------------------------------------------------------------- def signing_payload(op: Dict[str, Any]) -> bytes: """Canonical bytes covered by the envelope signature.""" return canonical_json({key: op[key] for key in SIGNED_FIELDS}) def compute_op_id(op: Dict[str, Any]) -> str: """Content address of the signed envelope (signature included).""" return content_address({key: op[key] for key in SIGNED_FIELDS + ("sig",)}) def build_op( identity, node: str, op_type: str, body: Dict[str, Any], prev: Iterable[str], ts: int, ) -> Dict[str, Any]: """Build, validate, and sign a complete operation envelope.""" prev_list = sorted(set(prev)) op: Dict[str, Any] = { "v": PROTOCOL_VERSION, "type": op_type, "author": identity.key_id, "node": node, "ts": ts, "prev": prev_list, "body": body, } _validate_structure_unsigned(op) op["sig"] = identity.sign(signing_payload(op)) op["op_id"] = compute_op_id(op) return op # --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- def _require(condition: bool, message: str) -> None: if not condition: raise OpError(message) def _require_str(body: Dict[str, Any], field: str, ctx: str) -> str: value = body.get(field) _require(isinstance(value, str) and value != "", f"{ctx}: '{field}' must be a non-empty string") return value def _require_int(body: Dict[str, Any], field: str, ctx: str, lo: int = 0, hi: int = 2**53 - 1) -> int: value = body.get(field) _require(isinstance(value, int) and not isinstance(value, bool), f"{ctx}: '{field}' must be an integer") _require(lo <= value <= hi, f"{ctx}: '{field}' out of range [{lo}, {hi}]") return value def _require_keys(body: Dict[str, Any], allowed: Iterable[str], required: Iterable[str], ctx: str) -> None: _require(isinstance(body, dict), f"{ctx}: body must be an object") allowed_set, required_set = set(allowed), set(required) extra = set(body) - allowed_set _require(not extra, f"{ctx}: unexpected fields {sorted(extra)}") missing = required_set - set(body) _require(not missing, f"{ctx}: missing fields {sorted(missing)}") def _validate_evidence(body: Dict[str, Any]) -> None: ctx = "evidence body" _require_keys(body, ("source", "kind", "content", "captured_at", "external_id"), ("source", "kind", "content", "captured_at"), ctx) _require_str(body, "source", ctx) _require_str(body, "kind", ctx) _require(isinstance(body["content"], dict), f"{ctx}: 'content' must be an object") _require_int(body, "captured_at", ctx) if "external_id" in body: _require_str(body, "external_id", ctx) def _validate_claim(body: Dict[str, Any]) -> None: ctx = "claim body" _require_keys( body, ("subject", "predicate", "object", "confidence_bp", "derived_from", "deriver", "rationale"), ("subject", "predicate", "object", "confidence_bp", "derived_from", "deriver", "rationale"), ctx, ) _require_str(body, "subject", ctx) _require_str(body, "predicate", ctx) _require_int(body, "confidence_bp", ctx, 0, CONFIDENCE_MAX_BP) _require_str(body, "deriver", ctx) _require_str(body, "rationale", ctx) derived = body["derived_from"] _require(isinstance(derived, list), f"{ctx}: 'derived_from' must be an array") for item in derived: _require(is_content_address(item), f"{ctx}: 'derived_from' entries must be op ids (got {item!r})") _require(len(derived) == len(set(derived)), f"{ctx}: 'derived_from' must not contain duplicates") if body["deriver"] == "user": _require(derived == [], f"{ctx}: user assertions must have empty 'derived_from'") else: _require(len(derived) > 0, f"{ctx}: derived claims must cite at least one source op") def _validate_refutation(body: Dict[str, Any]) -> None: ctx = "refutation body" _require_keys(body, ("target", "reason"), ("target", "reason"), ctx) _require(is_content_address(body["target"]), f"{ctx}: 'target' must be an op id") _require_str(body, "reason", ctx) def _validate_grant(body: Dict[str, Any]) -> None: ctx = "grant body" _require_keys(body, ("cap",), ("cap",), ctx) cap = body["cap"] _require(isinstance(cap, dict), f"{ctx}: 'cap' must be an object") cap_ctx = "capability" _require_keys( cap, ("cap_id", "issuer", "audience", "scope", "issued_at", "expires_at", "sig"), ("cap_id", "issuer", "audience", "scope", "issued_at", "expires_at", "sig"), cap_ctx, ) _require(is_content_address(cap["cap_id"]), f"{cap_ctx}: 'cap_id' must be a content address") _require(is_key_id(cap["issuer"]), f"{cap_ctx}: 'issuer' must be a key id") _require(is_key_id(cap["audience"]), f"{cap_ctx}: 'audience' must be a key id") _require(isinstance(cap["scope"], dict), f"{cap_ctx}: 'scope' must be an object") _require_int(cap, "issued_at", cap_ctx) _require_int(cap, "expires_at", cap_ctx) _require_str(cap, "sig", cap_ctx) scope = cap["scope"] _require_keys( scope, ("predicates", "min_confidence_bp", "include_evidence"), ("predicates", "min_confidence_bp", "include_evidence"), "capability scope", ) _require(isinstance(scope["predicates"], list) and len(scope["predicates"]) > 0, "capability scope: 'predicates' must be a non-empty array") for predicate in scope["predicates"]: _require(isinstance(predicate, str) and predicate != "", "capability scope: predicates must be non-empty strings") _require_int(scope, "min_confidence_bp", "capability scope", 0, CONFIDENCE_MAX_BP) _require(scope["include_evidence"] is False, "capability scope: FP/1 forbids evidence delegation ('include_evidence' must be false)") def _validate_revocation(body: Dict[str, Any]) -> None: ctx = "revocation body" _require_keys(body, ("cap_id", "reason"), ("cap_id", "reason"), ctx) _require(is_content_address(body["cap_id"]), f"{ctx}: 'cap_id' must be a content address") _require_str(body, "reason", ctx) def _validate_attestation(body: Dict[str, Any]) -> None: ctx = "attestation body" _require_keys( body, ("cap_id", "claim_op", "subject", "predicate", "object", "confidence_bp", "provenance_commitment", "attested_at"), ("cap_id", "claim_op", "subject", "predicate", "object", "confidence_bp", "provenance_commitment", "attested_at"), ctx, ) _require(is_content_address(body["cap_id"]), f"{ctx}: 'cap_id' must be a content address") _require(is_content_address(body["claim_op"]), f"{ctx}: 'claim_op' must be an op id") _require_str(body, "subject", ctx) _require_str(body, "predicate", ctx) _require_int(body, "confidence_bp", ctx, 0, CONFIDENCE_MAX_BP) _require(is_content_address(body["provenance_commitment"]), f"{ctx}: 'provenance_commitment' must be a content address") _require_int(body, "attested_at", ctx) def _validate_receipt(body: Dict[str, Any]) -> None: ctx = "receipt body" _require_keys( body, ("cap_id", "revocation_op", "purged_attestations", "statement"), ("cap_id", "revocation_op", "purged_attestations", "statement"), ctx, ) _require(is_content_address(body["cap_id"]), f"{ctx}: 'cap_id' must be a content address") _require(is_content_address(body["revocation_op"]), f"{ctx}: 'revocation_op' must be an op id") _require_int(body, "purged_attestations", ctx) _require_str(body, "statement", ctx) _BODY_VALIDATORS = { OP_EVIDENCE: _validate_evidence, OP_CLAIM: _validate_claim, OP_REFUTATION: _validate_refutation, OP_GRANT: _validate_grant, OP_REVOCATION: _validate_revocation, OP_ATTESTATION: _validate_attestation, OP_RECEIPT: _validate_receipt, } def _validate_structure_unsigned(op: Dict[str, Any]) -> None: _require(isinstance(op, dict), "operation must be an object") _require(op.get("v") == PROTOCOL_VERSION, f"unsupported protocol version: {op.get('v')!r}") _require(op.get("type") in OP_TYPES, f"unknown op type: {op.get('type')!r}") _require(is_key_id(op.get("author")), "author must be a key id") _require(isinstance(op.get("node"), str) and op["node"] != "", "node must be a non-empty string") ts = op.get("ts") _require(isinstance(ts, int) and not isinstance(ts, bool) and ts >= 0, "ts must be a non-negative integer") prev = op.get("prev") _require(isinstance(prev, list), "prev must be an array") for parent in prev: _require(is_content_address(parent), f"prev entries must be op ids (got {parent!r})") _require(prev == sorted(set(prev)), "prev must be sorted and de-duplicated") _BODY_VALIDATORS[op["type"]](op["body"]) def verify_op(op: Dict[str, Any]) -> None: """Fully verify an operation: structure, body schema, op_id, signature. Raises :class:`OpError` on any failure. Idempotent and side-effect free. """ _require(isinstance(op, dict), "operation must be an object") extra = set(op) - set(ENVELOPE_FIELDS) _require(not extra, f"unexpected envelope fields {sorted(extra)}") missing = set(ENVELOPE_FIELDS) - set(op) _require(not missing, f"missing envelope fields {sorted(missing)}") _validate_structure_unsigned(op) _require(isinstance(op["sig"], str) and op["sig"] != "", "sig must be a non-empty string") _require(is_content_address(op["op_id"]), "op_id must be a content address") expected_id = compute_op_id(op) _require(op["op_id"] == expected_id, f"op_id mismatch: declared {op['op_id']}, computed {expected_id}") try: verify_signature(op["author"], op["sig"], signing_payload(op)) except SignatureError as exc: raise OpError(f"invalid signature on {op['op_id']}: {exc}") from exc def derived_from(op: Dict[str, Any]) -> List[str]: """Provenance edges of a claim op (empty for everything else).""" if op.get("type") == OP_CLAIM: return list(op["body"]["derived_from"]) return []