#!/usr/bin/env python3 """FablePool end-to-end demo: two user-owned nodes plus a delegated third party. What this script demonstrates, in order: 1. A "phone" node ingests calendar + photo-metadata evidence; a "laptop" node ingests notes. Both derive claims independently (divergent logs). 2. Two-way log synchronisation merges the logs; both nodes converge on the same operation set (conflict handling: append-only merge, deduplicated by op id, deterministic claim state). 3. The merged graph answers "what do you know about me and why?" with provenance for every claim. 4. The user refutes a claim on the laptop. The refutation cascades: downstream derived claims are invalidated, and after sync the phone reflects the same correction. 5. The phone grants a third-party "coach" node a claims-only slice scoped to specific predicates. The exported slice is verified to contain NO evidence operations — the delegate sees conclusions, never raw data. 6. The phone mechanically revokes the grant. The revocation is delivered to the coach, which verifiably honours it: the granted claims are no longer queryable on the delegate. Run from the repository root: python demo/run_demo.py # uses a temp directory, cleans up python demo/run_demo.py --keep # keep node state for inspection python demo/run_demo.py --state X # use directory X for node state Exit code is 0 only if every check passes. """ from __future__ import annotations import argparse import shutil import sys import tempfile from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) # run straight from a source checkout from fablepool.node import Node # noqa: E402 DATASETS = ROOT / "datasets" _CHECKS_PASSED = 0 _CHECKS_FAILED = 0 # --------------------------------------------------------------------------- # Output / assertion helpers # --------------------------------------------------------------------------- def banner(step: int, title: str) -> None: line = "=" * 72 print(f"\n{line}\nSTEP {step}: {title}\n{line}") def say(msg: str) -> None: print(f" {msg}") def check(condition: bool, msg: str) -> None: global _CHECKS_PASSED, _CHECKS_FAILED if condition: _CHECKS_PASSED += 1 print(f" [PASS] {msg}") else: _CHECKS_FAILED += 1 print(f" [FAIL] {msg}") raise SystemExit(f"demo check failed: {msg}") def f(obj, *names, default=None): for name in names: if isinstance(obj, dict): if name in obj: return obj[name] elif hasattr(obj, name): return getattr(obj, name) return default def op_dict(op): if isinstance(op, dict): return op if hasattr(op, "to_dict"): return op.to_dict() raise TypeError(f"unexpected operation type {type(op)!r}") def claim_dict(c): if isinstance(c, dict): return dict(c) out = {} for name in ( "claim_id", "subject", "predicate", "object", "value", "confidence", "status", "derived_from", "evidence_ids", ): if hasattr(c, name): out[name] = getattr(c, name) return out # --------------------------------------------------------------------------- # Node helpers # --------------------------------------------------------------------------- def create_node(path: Path, name: str) -> Node: try: return Node.create(path, name) except TypeError: return Node.create(path) def node_id(node: Node) -> str: return f(node, "node_id", "public_key") def op_ids(node: Node) -> set: store = getattr(node, "store", node) for fn in ("all_ops", "iter_ops", "ops", "list_ops"): method = getattr(store, fn, None) if callable(method): return {f(op_dict(o), "op_id", "id") for o in method()} raise RuntimeError("node store exposes no op listing") def claims_of(node: Node, include_inactive: bool = False): try: items = node.claims(include_inactive=include_inactive) except TypeError: items = node.claims() dicts = [claim_dict(c) for c in items] if not include_inactive: dicts = [d for d in dicts if d.get("status", "active") == "active"] return dicts def two_way_sync(a: Node, b: Node) -> None: b.import_bundle(a.export_bundle()) a.import_bundle(b.export_bundle()) def derive_count(node: Node) -> int: result = node.derive() if isinstance(result, int): return result try: return len(result) except TypeError: return 0 def ingest(node: Node, kind: str, source: Path) -> None: method = getattr(node, f"ingest_{kind}", None) if callable(method): result = method(source) else: result = node.ingest(kind, source) count = result if isinstance(result, int) else len(result or []) say(f"ingested {count} evidence op(s) from {source.relative_to(ROOT)}") # --------------------------------------------------------------------------- # Demo # --------------------------------------------------------------------------- def run(state: Path) -> None: # ------------------------------------------------------------------ 1 -- banner(1, "Create two user-owned nodes and one third-party delegate") phone = create_node(state / "phone", "Phone") laptop = create_node(state / "laptop", "Laptop") coach = create_node(state / "coach", "Coach (delegate)") say(f"phone id: {node_id(phone)}") say(f"laptop id: {node_id(laptop)}") say(f"coach id: {node_id(coach)}") check(len({node_id(phone), node_id(laptop), node_id(coach)}) == 3, "three distinct node identities created") # ------------------------------------------------------------------ 2 -- banner(2, "Ingest evidence on each user node (logs diverge)") ingest(phone, "calendar", DATASETS / "phone" / "calendar.ics") ingest(phone, "photos", DATASETS / "phone" / "photos.json") ingest(laptop, "notes", DATASETS / "laptop" / "notes") n_phone = derive_count(phone) n_laptop = derive_count(laptop) say(f"phone derived {n_phone} claim(s); laptop derived {n_laptop} claim(s)") check(n_phone > 0, "phone derivation produced claims from calendar/photo evidence") check(n_laptop > 0, "laptop derivation produced claims from notes evidence") check(op_ids(phone) != op_ids(laptop), "logs have diverged before sync") # ------------------------------------------------------------------ 3 -- banner(3, "Two-way log synchronisation (conflict handling = CRDT merge)") two_way_sync(phone, laptop) ids_phone, ids_laptop = op_ids(phone), op_ids(laptop) check(ids_phone == ids_laptop, f"both nodes converged on the same {len(ids_phone)}-op log") # idempotence: syncing again must change nothing before = len(ids_phone) two_way_sync(phone, laptop) check(len(op_ids(phone)) == before, "re-sync is idempotent (no duplicate ops)") # both nodes see the union of claims merged_preds_phone = {d["predicate"] for d in claims_of(phone)} merged_preds_laptop = {d["predicate"] for d in claims_of(laptop)} check(merged_preds_phone == merged_preds_laptop, "claim state is identical on both nodes after merge") # ------------------------------------------------------------------ 4 -- banner(4, '"What do you know about me and why?" (laptop view)') active = claims_of(laptop) for d in active: conf = d.get("confidence") conf_s = f"{conf:.2f}" if isinstance(conf, (int, float)) else "-" obj = d.get("object", d.get("value", "")) n_ev = len(d.get("evidence_ids") or []) n_up = len(d.get("derived_from") or []) say(f"{d['predicate']}: {obj} (conf {conf_s}; " f"{n_ev} evidence, {n_up} upstream claim(s))") check(len(active) > 0, "merged graph holds active claims with provenance") # ------------------------------------------------------------------ 5 -- banner(5, "User refutes a claim; correction cascades and syncs back") by_id = {d["claim_id"]: d for d in active} children = {cid: [] for cid in by_id} for d in active: for parent in d.get("derived_from") or []: if parent in children: children[parent].append(d["claim_id"]) target = max(by_id.values(), key=lambda d: len(children[d["claim_id"]])) target_children = children[target["claim_id"]] say(f"refuting on laptop: {target['predicate']}: " f"{target.get('object', target.get('value', ''))}") if target_children: say(f"this claim has {len(target_children)} downstream derived claim(s)") laptop.refute(target["claim_id"], "User correction: this is not true.") after = {d["claim_id"]: d for d in claims_of(laptop, include_inactive=True)} check(after[target["claim_id"]].get("status") != "active", "refuted claim is no longer active on the laptop") for cid in target_children: check(after[cid].get("status") != "active", f"downstream claim {cid[:16]}… was cascade-invalidated") two_way_sync(laptop, phone) phone_after = {d["claim_id"]: d for d in claims_of(phone, include_inactive=True)} check(phone_after[target["claim_id"]].get("status") != "active", "after sync, the phone honours the same correction") # ------------------------------------------------------------------ 6 -- banner(6, "Capability delegation: claims-only slice for the coach") remaining = claims_of(phone) check(len(remaining) > 0, "active claims remain after the correction") granted_preds = sorted({d["predicate"] for d in remaining})[:2] say(f"granting coach the predicates: {granted_preds}") try: grant_op = phone.grant(node_id(coach), granted_preds, note="coach gets a routine-only slice") except TypeError: grant_op = phone.grant(node_id(coach), granted_preds) grant_id = f(op_dict(grant_op) if not isinstance(grant_op, str) else {}, "op_id", "grant_id", "id", default=grant_op) say(f"grant id: {grant_id}") slice_ops = [op_dict(o) for o in phone.export_bundle(grant_id=grant_id)] slice_types = sorted({o.get("op_type", o.get("type", "?")) for o in slice_ops}) say(f"grant slice contains {len(slice_ops)} op(s) of types {slice_types}") check(all(t != "evidence" for t in slice_types), "grant slice contains NO raw evidence operations") check(any(t == "claim" for t in slice_types), "grant slice carries claim operations") coach.import_bundle(slice_ops) coach_claims = claims_of(coach) say(f"coach now answers from {len(coach_claims)} claim(s):") for d in coach_claims: say(f" {d['predicate']}: {d.get('object', d.get('value', ''))}") check(len(coach_claims) > 0, "delegate received the authorised claims") check(all(d["predicate"] in granted_preds for d in coach_claims), "delegate holds ONLY claims within the granted predicates") check(len(op_ids(coach)) < len(op_ids(phone)), "delegate log is a strict, narrow subset of the user's log") # ------------------------------------------------------------------ 7 -- banner(7, "Mechanical revocation, verifiably honoured by the delegate") try: revoke_op = phone.revoke(grant_id, reason="user revoked coach access") except TypeError: revoke_op = phone.revoke(grant_id) say("phone signed and logged a revocation operation") # Deliver the revocation. The grant-scoped export now carries the revoke # op; if the implementation refuses to export a dead grant, fall back to # delivering the revocation operation directly. try: delivery = list(phone.export_bundle(grant_id=grant_id)) except Exception: delivery = [] if not any(op_dict(o).get("op_type", op_dict(o).get("type")) == "revoke" for o in delivery): delivery = [revoke_op] coach.import_bundle(delivery) coach_after = claims_of(coach) leaked = [d for d in coach_after if d["predicate"] in granted_preds] check(len(leaked) == 0, "after revocation, the delegate no longer answers from granted claims") say("the revocation is itself a signed log entry — the coach's compliance " "is auditable by replaying its log") # ------------------------------------------------------------------ 8 -- banner(8, "Summary") say(f"{_CHECKS_PASSED} check(s) passed, {_CHECKS_FAILED} failed") say(f"phone log: {len(op_ids(phone))} ops " f"laptop log: {len(op_ids(laptop))} ops " f"coach log: {len(op_ids(coach))} ops") print("\nALL CHECKS PASSED — the demo covered ingest, derivation,") print("sync, correction cascade, claims-only delegation, and revocation.") def main() -> int: parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) parser.add_argument("--state", default=None, help="directory for node state (default: temp dir)") parser.add_argument("--keep", action="store_true", help="do not delete the state directory afterwards") args = parser.parse_args() if args.state: state = Path(args.state) state.mkdir(parents=True, exist_ok=True) cleanup = False else: state = Path(tempfile.mkdtemp(prefix="fablepool-demo-")) cleanup = not args.keep print(f"node state directory: {state}") try: run(state) return 0 finally: if cleanup: shutil.rmtree(state, ignore_errors=True) else: print(f"\nnode state preserved at: {state}") if __name__ == "__main__": raise SystemExit(main())