"""Capability-based delegation: grants, attestations, revocation, receipts. The contract (normative wording lives in docs/INTEROP.md): * A **grant** is a signed operation in the grantor's log naming a grantee key, a claim *selector* (predicate patterns, optional subject allow-list, minimum confidence), an explicit allow list (``read_claims``) and deny list (``read_evidence``), and an optional expiry. * The grantor serves a delegate **attestations**: signed claim summaries with every evidence pointer stripped. The delegate can verify authorship, grant linkage, and selector fit — but cannot reach underlying evidence, because the attestation simply does not contain it. A delegate must *reject* any attestation carrying forbidden fields (defense against a buggy or malicious grantor implementation widening the slice). * A **revocation** is a signed operation referencing the grant. On its next pull the delegate receives the revocation instead of attestations, must purge its cache, and must answer with a signed **purge receipt** whose state hash commits to an empty remaining set. The grantor verifies and archives the receipt. * Honoring is verifiable after the fact: the grantor (or an auditor) can challenge the delegate for a signed **holdings proof** at any time. Delegate-side storage discipline: attested claim *content* lives only in the delegate cache (a mutable, deletable file), never in the delegate's append-only log. The log holds only control-plane operations — the grant proof, the revocation proof, receipts — so a purge is a real deletion while the audit trail remains intact. """ from __future__ import annotations import fnmatch import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from .canonical import canonicalize, sha256_hex from .ops import Operation, op_from_dict, verify_op OP_GRANT = "cap_grant" OP_REVOKE = "cap_revoke" OP_ATTESTATION = "attestation" OP_PURGE_RECEIPT = "purge_receipt" OP_HOLDINGS = "holdings_proof" #: Fields that must never appear in an attestation body. Delegates enforce #: this on receipt; conformance tests enforce it on issuance. FORBIDDEN_ATTESTATION_KEYS = ("derived_from", "evidence", "content", "sources") class CapabilityError(Exception): """Raised on invalid grant/revocation/receipt handling.""" def _now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _parse_iso(ts: str) -> datetime: return datetime.fromisoformat(ts.replace("Z", "+00:00")) def _state_hash(grant_id: str, ids: List[str]) -> str: """Commitment to the set of attestations a delegate holds for a grant.""" return sha256_hex(canonicalize({"grant": grant_id, "remaining": sorted(ids)})) # ---------------------------------------------------------------------- # selectors # ---------------------------------------------------------------------- @dataclass class Selector: """The slice of the claim graph a grant exposes. ``predicates`` are shell-style patterns (``fnmatch``), e.g. ``["schedule.*", "diet.preference"]``. ``None`` means "any". ``subjects`` is an exact allow-list; ``None`` means "any". """ predicates: Optional[List[str]] = None subjects: Optional[List[str]] = None min_confidence: float = 0.0 def to_dict(self) -> Dict[str, Any]: return { "predicates": self.predicates, "subjects": self.subjects, "min_confidence": self.min_confidence, } @classmethod def from_dict(cls, data: Optional[Dict[str, Any]]) -> "Selector": data = data or {} return cls( predicates=data.get("predicates"), subjects=data.get("subjects"), min_confidence=float(data.get("min_confidence", 0.0)), ) def matches_fields(self, subject: str, predicate: str, confidence: float) -> bool: if confidence < self.min_confidence: return False if self.predicates is not None and not any( fnmatch.fnmatchcase(predicate, pattern) for pattern in self.predicates ): return False if self.subjects is not None and subject not in self.subjects: return False return True def matches_claim(self, claim) -> bool: return self.matches_fields(claim.subject, claim.predicate, claim.confidence) # ---------------------------------------------------------------------- # grantor side: grants, revocations, attestations # ---------------------------------------------------------------------- def make_grant( node, grantee: str, selector: Union[Selector, Dict[str, Any]], expires_at: Optional[str] = None, note: str = "", ) -> Operation: if isinstance(selector, dict): selector = Selector.from_dict(selector) body = { "grantee": grantee, "selector": selector.to_dict(), "allow": ["read_claims"], "deny": ["read_evidence"], "expires_at": expires_at, "note": note, } return node.append_op(OP_GRANT, body) def make_revocation(node, grant_id: str, reason: str = "revoked by owner") -> Operation: grant = find_grant(node.store, grant_id) if grant is None: raise CapabilityError(f"cannot revoke unknown grant: {grant_id!r}") if find_revocation(node.store, grant_id) is not None: raise CapabilityError(f"grant already revoked: {grant_id!r}") return node.append_op(OP_REVOKE, {"grant": grant_id, "reason": reason}) def find_grant(store, grant_id: Optional[str]) -> Optional[Operation]: if not grant_id or not store.has(grant_id): return None op = store.get(grant_id) return op if op.op_type == OP_GRANT else None def find_revocation(store, grant_id: str) -> Optional[Operation]: for op in store.by_type(OP_REVOKE): if op.body.get("grant") == grant_id: return op return None def grant_status(store, grant_id: str, now: Optional[datetime] = None) -> str: grant = find_grant(store, grant_id) if grant is None: return "unknown" if find_revocation(store, grant_id) is not None: return "revoked" expires = grant.body.get("expires_at") if expires: now_dt = now or datetime.now(timezone.utc) try: if now_dt >= _parse_iso(expires): return "expired" except ValueError: return "expired" # unparseable expiry fails closed return "active" def list_grants(store) -> List[Dict[str, Any]]: out = [] for op in store.by_type(OP_GRANT): out.append( { "grant_id": op.op_id, "grantee": op.body.get("grantee"), "selector": op.body.get("selector"), "expires_at": op.body.get("expires_at"), "note": op.body.get("note", ""), "status": grant_status(store, op.op_id), } ) return out def issue_attestations(node, grant: Operation) -> List[Operation]: """Issue (or reuse) attestations for every active claim the grant covers. Attestations are appended to the grantor's own log so issuance is auditable; reissuing for an unchanged claim reuses the existing operation instead of bloating the log. """ if grant_status(node.store, grant.op_id) != "active": return [] selector = Selector.from_dict(grant.body.get("selector")) existing: Dict[Tuple[str, str], Operation] = {} for op in node.store.by_type(OP_ATTESTATION): if op.author == node.key_id: existing[(op.body.get("grant"), op.body.get("claim"))] = op out: List[Operation] = [] for claim in node.graph.active_claims(): if not selector.matches_claim(claim): continue key = (grant.op_id, claim.claim_id) if key in existing: out.append(existing[key]) continue body = { "grant": grant.op_id, "claim": claim.claim_id, "subject": claim.subject, "predicate": claim.predicate, "object": claim.object, "confidence": claim.confidence, "asserted_at": _now_iso(), "redacted": ["derived_from", "evidence"], } out.append(node.append_op(OP_ATTESTATION, body)) return out def serve_delegate_pull(node, grant_id: Optional[str], grantee: Optional[str]) -> dict: grant = find_grant(node.store, grant_id) if grant is None: return {"type": "error", "error": f"unknown grant: {grant_id!r}"} if grant.body.get("grantee") != grantee: return {"type": "error", "error": "grantee key does not match grant"} status = grant_status(node.store, grant.op_id) reply: Dict[str, Any] = { "type": "delegate_ops", "grant": grant.to_dict(), "status": status, "attestations": [], } if status == "revoked": revocation = find_revocation(node.store, grant.op_id) assert revocation is not None reply["revocation"] = revocation.to_dict() return reply if status != "active": return reply reply["attestations"] = [a.to_dict() for a in issue_attestations(node, grant)] return reply def verify_attestation(att: Operation, grant: Operation) -> Optional[str]: """Return a rejection reason, or None if the attestation is acceptable.""" if att.op_type != OP_ATTESTATION: return f"not an attestation: {att.op_type!r}" if not verify_op(att): return "signature or identity verification failed" if att.author != grant.author: return "attestation author is not the grantor" body = att.body if body.get("grant") != grant.op_id: return "attestation is bound to a different grant" for key in FORBIDDEN_ATTESTATION_KEYS: if key in body: return f"attestation leaks forbidden field: {key!r}" selector = Selector.from_dict(grant.body.get("selector")) if not selector.matches_fields( str(body.get("subject", "")), str(body.get("predicate", "")), float(body.get("confidence", 0.0)), ): return "attestation falls outside the granted selector" return None # ---------------------------------------------------------------------- # delegate side: cache, pull, purge, receipts, holdings proofs # ---------------------------------------------------------------------- class DelegateCache: """Mutable, deletable storage for attested claim content. Deliberately *not* the append-only log: purging a grant deletes content for real, which is what makes mechanical revocation meaningful. """ def __init__(self, path: Optional[Union[str, Path]] = None) -> None: self._path: Optional[Path] = Path(path) if path is not None else None self._atts: Dict[str, dict] = {} if self._path is not None and self._path.exists(): data = json.loads(self._path.read_text(encoding="utf-8")) self._atts = dict(data.get("attestations", {})) def _save(self) -> None: if self._path is None: return self._path.parent.mkdir(parents=True, exist_ok=True) tmp = self._path.with_suffix(".tmp") tmp.write_text( json.dumps({"attestations": self._atts}, indent=2, sort_keys=True), encoding="utf-8", ) tmp.replace(self._path) def add(self, op: Operation) -> bool: if op.op_id in self._atts: return False self._atts[op.op_id] = op.to_dict() self._save() return True def ids_for_grant(self, grant_id: str) -> List[str]: return sorted( op_id for op_id, d in self._atts.items() if d.get("body", {}).get("grant") == grant_id ) def purge_grant(self, grant_id: str) -> List[str]: purged = self.ids_for_grant(grant_id) for op_id in purged: del self._atts[op_id] self._save() return purged def claims_view(self, grant_id: Optional[str] = None) -> List[Dict[str, Any]]: out = [] for op_id, d in sorted(self._atts.items()): body = d.get("body", {}) if grant_id is not None and body.get("grant") != grant_id: continue out.append( { "attestation_id": op_id, "claim": body.get("claim"), "subject": body.get("subject"), "predicate": body.get("predicate"), "object": body.get("object"), "confidence": body.get("confidence"), "grant": body.get("grant"), } ) return out def __len__(self) -> int: return len(self._atts) def make_purge_receipt( node, grant_id: str, revocation_id: Optional[str], purged_ids: List[str] ) -> Operation: remaining = node.delegate_cache.ids_for_grant(grant_id) body = { "grant": grant_id, "revocation": revocation_id, "purged": sorted(purged_ids), "remaining": remaining, "state_hash": _state_hash(grant_id, remaining), "purged_at": _now_iso(), } return node.append_op(OP_PURGE_RECEIPT, body) def accept_purge_receipt(node, receipt_dict: Optional[dict]) -> dict: """Grantor-side verification and archival of a delegate's purge receipt.""" def _reject(reason: str) -> dict: return {"type": "receipt_ack", "accepted": False, "reason": reason} if not isinstance(receipt_dict, dict): return _reject("missing receipt") try: op = op_from_dict(receipt_dict) except Exception as exc: return _reject(f"parse error: {exc}") if op.op_type != OP_PURGE_RECEIPT: return _reject(f"not a purge receipt: {op.op_type!r}") if not verify_op(op): return _reject("signature or identity verification failed") body = op.body grant = find_grant(node.store, body.get("grant")) if grant is None: return _reject("receipt references a grant we do not hold") if op.author != grant.body.get("grantee"): return _reject("receipt is not signed by the grantee") revocation_id = body.get("revocation") if revocation_id is not None: revocation = find_revocation(node.store, grant.op_id) if revocation is None or revocation.op_id != revocation_id: return _reject("receipt references an unknown revocation") remaining = list(body.get("remaining", [])) if body.get("state_hash") != _state_hash(grant.op_id, remaining): return _reject("state hash mismatch") if remaining: return _reject("delegate still holds attestations for this grant") node.store.append(op) if not node.store.has(op.op_id) else None return {"type": "receipt_ack", "accepted": True, "receipt": op.op_id} def has_valid_purge_receipt(store, grant_id: str) -> Optional[Operation]: """Grantor-side check: did the delegate verifiably purge this grant?""" grant = find_grant(store, grant_id) if grant is None: return None for op in store.by_type(OP_PURGE_RECEIPT): body = op.body if body.get("grant") != grant_id: continue if op.author != grant.body.get("grantee"): continue remaining = list(body.get("remaining", [])) if remaining: continue if body.get("state_hash") != _state_hash(grant_id, remaining): continue if verify_op(op): return op return None def delegate_pull(node, transport, grant_id: str) -> dict: """Run one delegate pull cycle against the grantor behind ``transport``. Returns a report dict with ``status`` in {ok, revoked, expired, error}. Honors revocation and expiry mechanically: purge first, then send a signed receipt. """ reply = transport.request( {"type": "delegate_pull", "grant_id": grant_id, "grantee": node.key_id} ) if reply.get("type") != "delegate_ops": return { "status": "error", "error": reply.get("error", f"unexpected reply: {reply.get('type')!r}"), } try: grant = op_from_dict(reply["grant"]) except Exception as exc: return {"status": "error", "error": f"grant proof unparseable: {exc}"} if grant.op_type != OP_GRANT or not verify_op(grant): return {"status": "error", "error": "grant proof failed verification"} if grant.body.get("grantee") != node.key_id: return {"status": "error", "error": "grant names a different grantee"} if not node.store.has(grant.op_id): node.store.append(grant) # keep the signed grant as local proof status = reply.get("status") if status == "revoked": try: revocation = op_from_dict(reply.get("revocation")) except Exception as exc: return {"status": "error", "error": f"revocation proof unparseable: {exc}"} if revocation.op_type != OP_REVOKE or not verify_op(revocation): return {"status": "error", "error": "revocation proof failed verification"} if ( revocation.author != grant.author or revocation.body.get("grant") != grant.op_id ): return {"status": "error", "error": "revocation does not match grant"} if not node.store.has(revocation.op_id): node.store.append(revocation) purged = node.delegate_cache.purge_grant(grant.op_id) receipt = make_purge_receipt(node, grant.op_id, revocation.op_id, purged) ack = transport.request({"type": "purge_receipt", "receipt": receipt.to_dict()}) return {"status": "revoked", "purged": purged, "receipt": receipt.op_id, "ack": ack} if status == "expired": purged = node.delegate_cache.purge_grant(grant.op_id) receipt = make_purge_receipt(node, grant.op_id, None, purged) ack = transport.request({"type": "purge_receipt", "receipt": receipt.to_dict()}) return {"status": "expired", "purged": purged, "receipt": receipt.op_id, "ack": ack} if status != "active": return {"status": "error", "error": f"unexpected grant status: {status!r}"} added: List[str] = [] skipped: List[dict] = [] for d in reply.get("attestations", []): try: att = op_from_dict(d) except Exception as exc: op_id = d.get("op_id", "") if isinstance(d, dict) else "" skipped.append({"op": op_id, "reason": f"parse error: {exc}"}) continue reason = verify_attestation(att, grant) if reason is not None: skipped.append({"op": att.op_id, "reason": reason}) continue if node.delegate_cache.add(att): added.append(att.op_id) return { "status": "ok", "added": added, "skipped": skipped, "held": len(node.delegate_cache.ids_for_grant(grant.op_id)), } def make_holdings_proof(node, grant_id: str) -> Operation: ids = node.delegate_cache.ids_for_grant(grant_id) body = { "grant": grant_id, "holdings": ids, "state_hash": _state_hash(grant_id, ids), "asserted_at": _now_iso(), } return node.append_op(OP_HOLDINGS, body) def serve_holdings(node, grant_id: Optional[str]) -> dict: if not grant_id: return {"type": "error", "error": "missing grant_id"} return {"type": "holdings", "proof": make_holdings_proof(node, grant_id).to_dict()} def verify_holdings_proof( proof: Optional[dict], expected_author: Optional[str] = None ) -> Tuple[bool, List[str], str]: """Returns (ok, holdings, reason).""" if not isinstance(proof, dict): return False, [], "missing proof" try: op = op_from_dict(proof) except Exception as exc: return False, [], f"parse error: {exc}" if op.op_type != OP_HOLDINGS or not verify_op(op): return False, [], "proof failed verification" if expected_author is not None and op.author != expected_author: return False, [], "proof signed by unexpected key" body = op.body holdings = [str(h) for h in body.get("holdings", [])] if body.get("state_hash") != _state_hash(str(body.get("grant", "")), holdings): return False, [], "state hash mismatch" return True, holdings, ""