"""Capability delegation: a third-party node receives only a claim-only, predicate-scoped slice (never the underlying evidence), and revocation is honored mechanically.""" from fablepool import sync from helpers import dump_ops, get_all_claims, load_everything, long_strings def _grant(owner, delegate_node, predicates): cap = owner.grant( audience=delegate_node.keypair.public_key_hex, predicates=predicates ) return cap["id"] if isinstance(cap, dict) else cap def _predicates(owner): return sorted({c["predicate"] for c in get_all_claims(owner)}) def test_delegate_receives_claim_only_slice(laptop, delegate): load_everything(laptop) predicates = _predicates(laptop) assert len(predicates) >= 2, ( "sample datasets should produce at least two distinct predicates so " "that scoping is observable" ) allowed = predicates[:1] cap_id = _grant(laptop, delegate, allowed) sync.sync_delegate(laptop, delegate, cap_id) delegate_ops = list(delegate.store.all_ops()) assert delegate_ops, "delegate received nothing" assert all(op.get("type") != "evidence" for op in delegate_ops), ( "delegate must never receive raw evidence ops" ) delegate_claims = get_all_claims(delegate) assert delegate_claims, "delegate should see the authorized claims" assert {c["predicate"] for c in delegate_claims} <= set(allowed), ( "delegate saw a predicate outside its capability" ) def test_disallowed_predicates_never_reach_delegate(laptop, delegate): load_everything(laptop) predicates = _predicates(laptop) assert len(predicates) >= 2 allowed, denied = predicates[:1], predicates[1:] cap_id = _grant(laptop, delegate, allowed) sync.sync_delegate(laptop, delegate, cap_id) blob = dump_ops(delegate) for claim in get_all_claims(laptop): if claim["predicate"] in denied: assert claim["id"] not in blob, ( f"claim {claim['id']} with denied predicate " f"{claim['predicate']!r} leaked to the delegate" ) def test_no_evidence_content_leaks_even_with_widest_grant(laptop, delegate): load_everything(laptop) secrets = set() for op in laptop.store.all_ops(): if op.get("type") == "evidence": secrets |= long_strings(op.get("payload"), min_len=40) assert secrets, "expected long raw strings in the sample evidence" cap_id = _grant(laptop, delegate, _predicates(laptop)) sync.sync_delegate(laptop, delegate, cap_id) blob = dump_ops(delegate) for secret in secrets: assert secret not in blob, "raw evidence content leaked to the delegate" def test_revocation_is_honored_by_the_delegate(laptop, delegate): load_everything(laptop) cap_id = _grant(laptop, delegate, _predicates(laptop)) sync.sync_delegate(laptop, delegate, cap_id) assert any( c.get("status", "active") == "active" for c in get_all_claims(delegate) ), "delegate should hold active claims before revocation" laptop.revoke(cap_id) # The next sync must deliver the signed revocation, and the delegate # must mechanically honor it. sync.sync_delegate(laptop, delegate, cap_id) active = [ c for c in get_all_claims(delegate) if c.get("status", "active") == "active" ] assert not active, ( "delegate continued to serve claims after the capability was revoked" ) def test_revoked_capability_grants_nothing_new(laptop, delegate): load_everything(laptop) cap_id = _grant(laptop, delegate, _predicates(laptop)) laptop.revoke(cap_id) try: sync.sync_delegate(laptop, delegate, cap_id) except Exception: # Refusing the sync outright is acceptable for a never-used, # already-revoked capability. pass active = [ c for c in get_all_claims(delegate) if c.get("status", "active") == "active" ] assert not active, "a revoked capability must not deliver active claims" assert all( op.get("type") != "evidence" for op in delegate.store.all_ops() )