"""FablePool inspection and refutation CLI. Every command answers some part of "what do you know about me and why?": * ``topics`` / ``claims`` / ``show`` — browse the claim graph * ``why`` / ``evidence`` / ``deps`` — drill into provenance and impact * ``refute`` / ``correct`` — push back, watch the cascade * ``export`` / ``audit`` / ``log`` — portability and auditability * ``caps`` — capability grants on the ledger * ``ask`` — the canonical demo question * ``shell`` — interactive mode All query commands support ``--json`` for machine-readable output, which the test suite and scripted walkthroughs rely on. """ from __future__ import annotations import argparse import json import os import sys from collections import Counter from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from rich.console import Console from rich.panel import Panel from fablepool import identity as identity_mod from fablepool import render, wire from fablepool.errors import FablePoolError from fablepool.store import Node DEFAULT_NODE_DIR = "fablepool-node" TOPIC_SYNONYMS = { "health": {"health", "healthy", "doctor", "dentist", "medical", "allergy", "allergies"}, "fitness": {"fitness", "exercise", "workout", "run", "running", "yoga", "train", "training"}, "location": {"location", "live", "home", "city", "where"}, "work": {"work", "job", "schedule", "meetings"}, "travel": {"travel", "trip", "flight", "vacation", "portland"}, "preferences": {"preferences", "eat", "food", "diet", "groceries", "buy"}, "relationships": {"relationships", "friends", "partner", "family"}, } # --------------------------------------------------------------------------- # Node access and reference resolution # --------------------------------------------------------------------------- def _node_path(args: argparse.Namespace) -> Path: return Path(args.node or os.environ.get("FABLEPOOL_NODE") or DEFAULT_NODE_DIR) def _open_node(args: argparse.Namespace, console: Console, create_ok: bool = False) -> Node: path = _node_path(args) if path.exists(): return Node.open(path) if create_ok: node = Node.create(path) console.print(f"[dim]created new node at {path}[/]") return node raise FablePoolError( f"no node found at '{path}' — run `fablepool init {path}` first, " f"or point --node / FABLEPOOL_NODE at an existing node" ) def resolve_ref(node: Node, ref: str) -> str: """Resolve an exact identifier or a unique prefix to a full op/claim id.""" ids = [render.obj_id(op) for op in node.operations()] if ref in ids: return ref matches = [i for i in ids if i.startswith(ref)] if len(matches) == 1: return matches[0] if not matches: raise FablePoolError(f"nothing matches '{ref}' (claims, evidence, operations)") raise FablePoolError( f"'{ref}' is ambiguous ({len(matches)} matches) — use a longer prefix" ) def _claim_dict(claim: Any) -> Dict[str, Any]: data = render.to_plain(claim) data.setdefault("id", render.obj_id(claim)) return data def _extract_cascade(result: Any) -> Tuple[Optional[str], Optional[str], List[str]]: """Pull (op_id, new_claim_id, invalidated[]) out of a cascade result.""" if isinstance(result, dict): new_claim = result.get("new_claim_id") if new_claim is None and result.get("new_claim") is not None: new_claim = render.obj_id(result["new_claim"]) return result.get("op_id"), new_claim, list(result.get("invalidated", []) or []) op_id = getattr(result, "op_id", None) new_claim = getattr(result, "new_claim_id", None) if new_claim is None: nested = getattr(result, "new_claim", None) if nested is not None: new_claim = render.obj_id(nested) invalidated = [str(i) for i in (getattr(result, "invalidated", None) or [])] return op_id, new_claim, invalidated def _confirm(args: argparse.Namespace, prompt: str) -> bool: if getattr(args, "yes", False): return True if not sys.stdin.isatty(): raise FablePoolError("refusing to modify the graph non-interactively; pass --yes") answer = input(f"{prompt} [y/N] ").strip().lower() return answer in ("y", "yes") # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_init(args: argparse.Namespace, console: Console) -> int: path = Path(args.path) if args.path else _node_path(args) if path.exists() and any(path.iterdir()): raise FablePoolError(f"'{path}' already exists and is not empty") node = Node.create(path) key_id = getattr(getattr(node, "identity", None), "key_id", "?") console.print(f"[green]✓[/] initialized FablePool node at [bold]{path}[/]") console.print(f" node identity: {key_id}") console.print(f" next: fablepool --node {path} seed") return 0 def cmd_seed(args: argparse.Namespace, node: Node, console: Console) -> int: from fablepool.seed import CLAIM_LEVELS, seed_demo result = seed_demo(node) if args.json: print(json.dumps({"evidence": result.evidence, "claims": result.claims}, indent=2)) return 0 console.print( f"[green]✓[/] seeded demo dataset: {result.evidence_count} evidence records, " f"{result.claim_count} claims across {max(CLAIM_LEVELS.values())} derivation levels" ) counts: Dict[str, Tuple[int, int]] = {} for claim in node.claims(): topic = str(getattr(claim, "topic", "?")) active, total = counts.get(topic, (0, 0)) is_active = getattr(claim, "status", "active") == "active" counts[topic] = (active + (1 if is_active else 0), total + 1) console.print(render.topics_table(counts)) console.print("Try:") console.print(' fablepool ask "what do you know about me"') console.print(" fablepool claims --topic fitness") console.print(f" fablepool why {render.short(result.claims['wellness_profile'])}") return 0 def cmd_topics(args: argparse.Namespace, node: Node, console: Console) -> int: counts: Dict[str, Tuple[int, int]] = {} for claim in node.claims(): topic = str(getattr(claim, "topic", "?")) active, total = counts.get(topic, (0, 0)) is_active = getattr(claim, "status", "active") == "active" counts[topic] = (active + (1 if is_active else 0), total + 1) if args.json: print(json.dumps({t: {"active": a, "total": n} for t, (a, n) in counts.items()}, indent=2, sort_keys=True)) return 0 if not counts: console.print("no claims yet — run `fablepool seed` or an import adapter") return 0 console.print(render.topics_table(counts)) return 0 def cmd_claims(args: argparse.Namespace, node: Node, console: Console) -> int: status_filter = args.status if args.status else (None if args.all else "active") claims = list(node.claims()) if args.topic: claims = [c for c in claims if str(getattr(c, "topic", "")) == args.topic] if status_filter: claims = [c for c in claims if getattr(c, "status", "active") == status_filter] claims.sort(key=lambda c: (str(getattr(c, "topic", "")), str(getattr(c, "created_at", "")))) if args.json: print(json.dumps([_claim_dict(c) for c in claims], indent=2, default=str)) return 0 if not claims: console.print("no claims match that filter") return 0 title = "Claims" if args.topic: title += f" — topic '{args.topic}'" if status_filter: title += f" ({status_filter})" console.print(render.claims_table(claims, title=title)) console.print("[dim]drill in with: fablepool show · fablepool why [/]") return 0 def cmd_show(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) try: kind, obj = render.ref_kind(node, ref) except FablePoolError: kind, obj = None, None if kind == "claim": if args.json: print(json.dumps(_claim_dict(obj), indent=2, default=str)) else: console.print(render.claim_detail(node, obj)) return 0 if kind == "evidence": if args.json: print(json.dumps(render.to_plain(obj), indent=2, default=str)) else: console.print(render.evidence_detail(obj)) return 0 for op in node.operations(): if render.obj_id(op) == ref: if args.json: print(json.dumps(render.to_plain(op), indent=2, default=str)) else: console.print(Panel(json.dumps(render.to_plain(op), indent=2, default=str), title=f"Operation {render.short(ref)}")) return 0 raise FablePoolError(f"'{args.ref}' resolved to {ref} but no record was found") def cmd_why(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) claim = node.get_claim(ref) if args.json: print(json.dumps( {"claim": _claim_dict(claim), "provenance": render.provenance_dict(node, ref, args.depth)}, indent=2, default=str, )) return 0 console.print(render.claim_detail(node, claim)) console.print("\n[bold]Derivation (claim → sources → raw evidence):[/]") console.print(render.provenance_tree(node, ref, max_depth=args.depth)) sources = getattr(claim, "sources", None) or [] confidence = getattr(claim, "confidence", None) pct = f"{int(round(float(confidence) * 100))}%" if confidence is not None else "unknown" console.print( f"\nRule [bold]{getattr(claim, 'rule', '?')}[/] combined {len(sources)} source(s) " f"into this claim with confidence {pct}." ) console.print( "[dim]Disagree? fablepool refute " f"{render.short(ref)} --reason '…' · fablepool correct {render.short(ref)} --value '…' --reason '…'[/]" ) return 0 def cmd_evidence(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) evidence = node.get_evidence(ref) if args.json: print(json.dumps(render.to_plain(evidence), indent=2, default=str)) return 0 console.print(render.evidence_detail(evidence)) dependents = render.direct_dependents(node, ref) if dependents: console.print("[bold]Claims derived from this evidence:[/]") for dep in dependents: try: dep_claim = node.get_claim(dep) console.print(f" • {render.short(dep)} — {render.claim_brief(dep_claim)}") except (FablePoolError, KeyError, LookupError): console.print(f" • {render.short(dep)}") return 0 def cmd_refute(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) claim = node.get_claim(ref) downstream = render.direct_dependents(node, ref) prompt = ( f"Refute '{render.claim_brief(claim)}'? " f"{len(downstream)} claim(s) depend on it directly and will be re-evaluated." ) if not _confirm(args, prompt): console.print("aborted") return 1 result = node.refute_claim(ref, reason=args.reason) op_id, _, invalidated = _extract_cascade(result) if args.json: print(json.dumps({"refuted": ref, "op_id": op_id, "invalidated": invalidated}, indent=2)) return 0 console.print(f"[red]✗[/] refuted {render.short(ref)} — {render.claim_brief(claim)}") if op_id: console.print(f" refutation recorded as operation {render.short(op_id)}") _print_cascade(node, console, invalidated) return 0 def cmd_correct(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) claim = node.get_claim(ref) prompt = f"Correct '{render.claim_brief(claim)}' to '{args.value}'?" if not _confirm(args, prompt): console.print("aborted") return 1 kwargs: Dict[str, Any] = {"new_value": args.value, "reason": args.reason} if args.confidence is not None: kwargs["confidence"] = args.confidence result = node.correct_claim(ref, **kwargs) op_id, new_claim_id, invalidated = _extract_cascade(result) if args.json: print(json.dumps( {"corrected": ref, "op_id": op_id, "new_claim_id": new_claim_id, "invalidated": invalidated}, indent=2, )) return 0 console.print(f"[yellow]✎[/] corrected {render.short(ref)} — {render.claim_brief(claim)}") console.print(f" new value: [bold]{args.value}[/]") if new_claim_id: console.print(f" superseding claim: {render.short(new_claim_id)}") if op_id: console.print(f" correction recorded as operation {render.short(op_id)}") _print_cascade(node, console, invalidated) return 0 def _print_cascade(node: Node, console: Console, invalidated: List[str]) -> None: if not invalidated: console.print(" no downstream claims were affected") return console.print(f" [yellow]cascade:[/] {len(invalidated)} downstream claim(s) invalidated:") for cid in invalidated: try: c = node.get_claim(cid) console.print(f" ↳ {render.short(cid)} — {render.claim_brief(c)} [{getattr(c, 'status', '?')}]") except (FablePoolError, KeyError, LookupError): console.print(f" ↳ {render.short(cid)}") def cmd_deps(args: argparse.Namespace, node: Node, console: Console) -> int: ref = resolve_ref(node, args.ref) if args.json: payload: Dict[str, Any] = {"id": ref} if args.direction in ("up", "both"): payload["provenance"] = render.provenance_dict(node, ref, args.depth) if args.direction in ("down", "both"): payload["dependents"] = render.dependents_dict(node, ref, args.depth) print(json.dumps(payload, indent=2, default=str)) return 0 if args.direction in ("up", "both"): console.print("[bold]Upstream (what this is derived from):[/]") try: console.print(render.provenance_tree(node, ref, max_depth=args.depth)) except (FablePoolError, KeyError, LookupError): console.print(" (not a claim — evidence has no upstream sources)") if args.direction in ("down", "both"): console.print("[bold]Downstream (what depends on this):[/]") console.print(render.dependents_tree(node, ref, max_depth=args.depth)) return 0 def cmd_export(args: argparse.Namespace, node: Node, console: Console) -> int: encode = getattr(wire, "encode_operation", None) lines: List[str] = [] for op in node.operations(): if callable(encode): encoded = encode(op) line = encoded.decode("utf-8") if isinstance(encoded, bytes) else str(encoded) else: line = json.dumps(render.to_plain(op), sort_keys=True, separators=(",", ":"), default=str) lines.append(line.rstrip("\n")) payload = "\n".join(lines) + ("\n" if lines else "") if args.out: Path(args.out).write_text(payload, encoding="utf-8") console.print(f"[green]✓[/] exported {len(lines)} operation(s) in wire format to {args.out}") else: sys.stdout.write(payload) return 0 def cmd_audit(args: argparse.Namespace, node: Node, console: Console) -> int: ops = list(node.operations()) findings: List[Dict[str, str]] = [] def finding(level: str, op_id: str, check: str, message: str) -> None: findings.append({"level": level, "op_id": op_id, "check": check, "message": message}) compute_op_id = getattr(wire, "compute_op_id", None) signing_payload = getattr(wire, "signing_payload", None) verify_signature = getattr(identity_mod, "verify_signature", None) if compute_op_id is None: finding("info", "-", "op-id", "id recomputation unavailable in this build; skipped") if signing_payload is None or verify_signature is None: finding("info", "-", "signature", "signature verification helpers unavailable; skipped") seen_ids: set = set() prev_id: Optional[str] = None for index, op in enumerate(ops): oid = render.obj_id(op) kind = getattr(op, "kind", "?") prev = getattr(op, "prev", None) if index == 0: if prev: finding("warning", oid, "chain", "first operation has a non-empty prev pointer") elif prev != prev_id: finding("error", oid, "chain", f"prev pointer {prev!r} does not match previous op {prev_id!r}") if callable(compute_op_id): try: recomputed = compute_op_id(op) if recomputed != oid: finding("error", oid, "op-id", f"stored id does not match recomputed id {recomputed}") except Exception as exc: # defensive: an unverifiable op is a finding, not a crash finding("warning", oid, "op-id", f"could not recompute id: {exc}") if callable(signing_payload) and callable(verify_signature): try: ok = verify_signature(getattr(op, "author", None), signing_payload(op), getattr(op, "sig", None)) if not ok: finding("error", oid, "signature", "signature verification failed") except Exception as exc: finding("error", oid, "signature", f"signature check raised: {exc}") body = getattr(op, "body", None) or {} if isinstance(body, dict): if kind == "claim": for src in body.get("sources", []) or []: if src not in seen_ids: finding("error", oid, "references", f"claim cites unknown source {src}") if kind in ("refutation", "correction"): target = body.get("target") or body.get("claim_id") if target and target not in seen_ids: finding("error", oid, "references", f"{kind} targets unknown operation {target}") seen_ids.add(oid) prev_id = oid errors = [f for f in findings if f["level"] == "error"] if args.json: print(json.dumps({"operations": len(ops), "findings": findings, "ok": not errors}, indent=2)) return 1 if errors else 0 if not findings: console.print( f"[green]✓ audit passed[/] — {len(ops)} operations, hash chain intact, " f"all signatures verified, all references resolve" ) return 0 for f in findings: style = {"error": "red", "warning": "yellow", "info": "dim"}.get(f["level"], "white") console.print(f"[{style}]{f['level']:<7}[/] {render.short(f['op_id'])} {f['check']}: {f['message']}") console.print( f"\naudit checked {len(ops)} operation(s): " f"{len(errors)} error(s), {len(findings) - len(errors)} other finding(s)" ) return 1 if errors else 0 def cmd_log(args: argparse.Namespace, node: Node, console: Console) -> int: ops = list(node.operations()) if args.kind: ops = [op for op in ops if getattr(op, "kind", None) == args.kind] if args.limit: ops = ops[-args.limit:] if args.json: print(json.dumps([render.to_plain(op) for op in ops], indent=2, default=str)) return 0 if not ops: console.print("operation log is empty (for that filter)") return 0 console.print(render.ops_table(ops)) return 0 def cmd_caps(args: argparse.Namespace, node: Node, console: Console) -> int: ops = list(node.operations()) grants = [op for op in ops if getattr(op, "kind", None) == "grant"] revoked_targets = set() for op in ops: if getattr(op, "kind", None) == "revoke": body = getattr(op, "body", None) or {} if isinstance(body, dict): target = body.get("target") or body.get("grant_id") if target: revoked_targets.add(target) records = [] for op in grants: body = getattr(op, "body", None) or {} oid = render.obj_id(op) records.append({ "grant_id": oid, "audience": body.get("audience") or body.get("grantee") or "?", "scope": body.get("scope") or body.get("topics") or "?", "status": "revoked" if oid in revoked_targets else "active", }) if args.json: print(json.dumps(records, indent=2, default=str)) return 0 if not records: console.print( "no capability grants recorded on this node yet — grants issued during the " "sync/delegation demo (milestone 6) will appear here, with mechanical revocation status" ) return 0 from rich.table import Table table = Table(title="Capability grants") table.add_column("grant id", style="dim") table.add_column("audience") table.add_column("scope", overflow="fold") table.add_column("status") for record in records: style = "green" if record["status"] == "active" else "red" table.add_row( render.short(record["grant_id"]), str(record["audience"]), str(record["scope"]), f"[{style}]{record['status']}[/]", ) console.print(table) return 0 def cmd_ask(args: argparse.Namespace, node: Node, console: Console) -> int: question = " ".join(args.question).lower() if args.question else "" active = [c for c in node.claims() if getattr(c, "status", "active") == "active"] all_topics = sorted({str(getattr(c, "topic", "?")) for c in active}) selected = set() for topic in all_topics: synonyms = TOPIC_SYNONYMS.get(topic, {topic}) if topic in question or any(word in question.split() for word in synonyms): selected.add(topic) if not selected: selected = set(all_topics) chosen = [c for c in active if str(getattr(c, "topic", "?")) in selected] chosen.sort(key=lambda c: (str(getattr(c, "topic", "")), -float(getattr(c, "confidence", 0) or 0))) if args.json: print(json.dumps([ { "topic": getattr(c, "topic", None), "claim_id": render.obj_id(c), "predicate": getattr(c, "predicate", None), "value": getattr(c, "value", None), "confidence": getattr(c, "confidence", None), "rule": getattr(c, "rule", None), "sources": list(getattr(c, "sources", None) or []), } for c in chosen ], indent=2, default=str)) return 0 if not chosen: console.print("I don't hold any active claims about you yet.") return 0 console.print("[bold]What I believe about you, and why:[/]\n") current_topic = None for claim in chosen: topic = str(getattr(claim, "topic", "?")) if topic != current_topic: console.print(f"[bold underline]{topic}[/]") current_topic = topic confidence = getattr(claim, "confidence", None) pct = f"{int(round(float(confidence) * 100))}%" if confidence is not None else "?" sources = getattr(claim, "sources", None) or [] console.print( f" • {render.humanize(getattr(claim, 'predicate', '?'))}: " f"[bold]{getattr(claim, 'value', '?')}[/] " f"[dim]({pct} confident · {len(sources)} source(s) · rule {getattr(claim, 'rule', '?')} " f"· why: fablepool why {render.short(render.obj_id(claim))})[/]" ) console.print( "\n[dim]Every claim traces to raw evidence you can inspect. Disagree? " "fablepool refute --reason '…' or fablepool correct --value '…' --reason '…'[/]" ) return 0 def cmd_shell(args: argparse.Namespace, node: Node, console: Console) -> int: from fablepool import shell return shell.run(node, console, node_path=str(_node_path(args))) # --------------------------------------------------------------------------- # Parser # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="fablepool", description="Inspect, question, and correct your user-owned AI memory.", ) parser.add_argument("--node", help=f"node directory (default: $FABLEPOOL_NODE or ./{DEFAULT_NODE_DIR})") parser.add_argument("--json", action="store_true", help="emit machine-readable JSON") parser.add_argument("--no-color", action="store_true", help="disable colored output") sub = parser.add_subparsers(dest="command") p = sub.add_parser("init", help="create a new node with a fresh identity") p.add_argument("path", nargs="?", help="node directory (defaults to --node)") sub.add_parser("seed", help="populate the node with the Avery demo dataset") p = sub.add_parser("topics", help="list topics with claim counts") p = sub.add_parser("claims", help="list claims, filterable by topic and status") p.add_argument("--topic", help="only this topic") p.add_argument("--status", choices=["active", "refuted", "invalidated", "superseded"]) p.add_argument("--all", action="store_true", help="include non-active claims") p = sub.add_parser("show", help="show a claim, evidence record, or operation") p.add_argument("ref", help="full id or unique prefix") p = sub.add_parser("why", help="explain a claim: provenance down to raw evidence") p.add_argument("ref") p.add_argument("--depth", type=int, default=6, help="max derivation depth to display") p = sub.add_parser("evidence", help="show a raw evidence record and what was derived from it") p.add_argument("ref") p = sub.add_parser("refute", help="refute a claim and cascade-invalidate downstream claims") p.add_argument("ref") p.add_argument("--reason", required=True, help="why this claim is wrong") p.add_argument("--yes", action="store_true", help="skip confirmation") p = sub.add_parser("correct", help="correct a claim's value; downstream claims are re-evaluated") p.add_argument("ref") p.add_argument("--value", required=True, help="the corrected value") p.add_argument("--reason", required=True, help="why the original was wrong") p.add_argument("--confidence", type=float, help="confidence for the corrected claim (0..1)") p.add_argument("--yes", action="store_true", help="skip confirmation") p = sub.add_parser("deps", help="show upstream provenance and/or downstream dependents") p.add_argument("ref") p.add_argument("--direction", choices=["up", "down", "both"], default="both") p.add_argument("--depth", type=int, default=6) p = sub.add_parser("export", help="export the full operation log in wire format (JSONL)") p.add_argument("--out", "-o", help="write to file instead of stdout") sub.add_parser("audit", help="verify hash chain, signatures, and references of the op log") p = sub.add_parser("log", help="browse the append-only operation log") p.add_argument("--kind", help="filter by operation kind (evidence, claim, refutation, …)") p.add_argument("-n", "--limit", type=int, help="show only the last N operations") sub.add_parser("caps", help="list capability grants and their revocation status") p = sub.add_parser("ask", help='ask the node, e.g. fablepool ask "what do you know about me"') p.add_argument("question", nargs="*", help="free-text question") sub.add_parser("shell", help="interactive inspection shell") return parser HANDLERS = { "seed": cmd_seed, "topics": cmd_topics, "claims": cmd_claims, "show": cmd_show, "why": cmd_why, "evidence": cmd_evidence, "refute": cmd_refute, "correct": cmd_correct, "deps": cmd_deps, "export": cmd_export, "audit": cmd_audit, "log": cmd_log, "caps": cmd_caps, "ask": cmd_ask, "shell": cmd_shell, } def main(argv: Optional[List[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) console = render.make_console(no_color=args.no_color) if not args.command: parser.print_help() return 1 try: if args.command == "init": return cmd_init(args, console) node = _open_node(args, console, create_ok=(args.command == "seed")) return HANDLERS[args.command](args, node, console) except FablePoolError as exc: console.print(f"[red]error:[/] {exc}") return 1 except KeyboardInterrupt: console.print("\ninterrupted") return 130 if __name__ == "__main__": sys.exit(main())