"""FablePool command-line interface. This is the operator-facing surface of the reference node. Every command is a thin wrapper over :class:`fablepool.node.Node`; the CLI never touches the store or the wire format directly, so a second implementation can replace the internals while keeping this UX. Run as a module (no install step required from a source checkout): python -m fablepool.cli [options] Commands -------- init Create a new node (generates an Ed25519 identity). id Print this node's public identity. ingest Import evidence from a source (calendar | notes | photos). derive Run the derivation engine over current evidence. claims List claims (active by default; --all includes refuted/invalidated). explain Show the full provenance chain for one claim. refute Refute a claim; downstream derived claims are invalidated. answer "What do you know about me and why?" — narrated claim summary. grant Grant a delegate a claims-only slice by predicate. grants List grants and their status. revoke Mechanically revoke a grant. export Export a signed operation bundle (optionally a grant slice). import Import a bundle produced by another node. sync Two-way log sync with a peer node over HTTP. serve Serve this node's sync endpoint over HTTP. log Dump the raw operation log. verify Verify every signature in the local log. """ from __future__ import annotations import argparse import json import sys from pathlib import Path # --------------------------------------------------------------------------- # Helpers that tolerate both dataclass-style and dict-style objects, so the # CLI keeps working if internal representations evolve. All of these are # read-only conveniences; writes always go through Node methods. # --------------------------------------------------------------------------- def _field(obj, *names, default=None): """Return the first present attribute/key from *names* on *obj*.""" for name in names: if isinstance(obj, dict): if name in obj: return obj[name] elif hasattr(obj, name): return getattr(obj, name) return default def _op_to_dict(op): if isinstance(op, dict): return op if hasattr(op, "to_dict"): return op.to_dict() raise TypeError(f"cannot serialise operation of type {type(op)!r}") _CLAIM_FIELDS = ( "claim_id", "subject", "predicate", "object", "value", "confidence", "status", "derived_from", "evidence_ids", "author", "created_at", ) def _claim_to_dict(c): if isinstance(c, dict): return dict(c) out = {} for name in _CLAIM_FIELDS: if hasattr(c, name): out[name] = getattr(c, name) return out def _store_ops(node): """List every operation in the node's log, tolerating store API names.""" store = getattr(node, "store", None) candidates = [node] if store is None else [store, node] for target in candidates: for fn in ("all_ops", "iter_ops", "ops", "list_ops"): method = getattr(target, fn, None) if callable(method): return list(method()) raise SystemExit("error: node store does not expose an operation listing") def _die(msg: str, code: int = 1) -> "SystemExit": print(f"error: {msg}", file=sys.stderr) return SystemExit(code) def _load_node(path: str): from fablepool.node import Node p = Path(path) if not p.exists(): raise _die(f"no node at {p} — run `init --path {p}` first") return Node(p) def _list_claims(node, include_inactive: bool = False): try: items = node.claims(include_inactive=include_inactive) except TypeError: items = node.claims() dicts = [_claim_to_dict(c) for c in items] if not include_inactive: dicts = [d for d in dicts if d.get("status", "active") == "active"] return dicts def _json_dump(obj) -> str: return json.dumps(obj, indent=2, sort_keys=True, default=str) # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_init(args) -> int: from fablepool.node import Node p = Path(args.path) if p.exists() and any(p.iterdir()): raise _die(f"{p} already exists and is not empty") try: node = Node.create(p, args.name) except TypeError: node = Node.create(p) print(f"created node '{args.name}' at {p}") print(f"node id: {_field(node, 'node_id', 'public_key')}") return 0 def cmd_id(args) -> int: node = _load_node(args.path) print(_field(node, "node_id", "public_key")) return 0 def cmd_ingest(args) -> int: node = _load_node(args.path) source = Path(args.source) if not source.exists(): raise _die(f"source {source} does not exist") method = getattr(node, f"ingest_{args.kind}", None) if callable(method): result = method(source) elif hasattr(node, "ingest"): result = node.ingest(args.kind, source) else: raise _die(f"node has no ingest path for kind '{args.kind}'") count = result if isinstance(result, int) else len(result or []) print(f"ingested {count} evidence operation(s) from {source} ({args.kind})") return 0 def cmd_derive(args) -> int: node = _load_node(args.path) result = node.derive() if isinstance(result, int): count = result else: try: count = len(result) except TypeError: count = "?" print(f"derivation produced {count} new claim operation(s)") return 0 def cmd_claims(args) -> int: node = _load_node(args.path) claims = _list_claims(node, include_inactive=args.all) if args.json: print(_json_dump(claims)) return 0 if not claims: print("(no claims)") return 0 for d in claims: cid = str(d.get("claim_id", "?")) status = d.get("status", "active") conf = d.get("confidence") conf_s = f"{conf:.2f}" if isinstance(conf, (int, float)) else "-" obj = d.get("object", d.get("value", "")) print(f" [{status:>11}] conf={conf_s} {d.get('predicate', '?')}: {obj}") print(f" id={cid}") print(f"{len(claims)} claim(s)") return 0 def cmd_explain(args) -> int: node = _load_node(args.path) info = node.explain(args.claim_id) print(_json_dump(info if isinstance(info, dict) else _claim_to_dict(info))) return 0 def cmd_refute(args) -> int: node = _load_node(args.path) result = node.refute(args.claim_id, args.reason) print(f"refuted claim {args.claim_id}") # node.refute may return (refutation_op, invalidated_ids) or just an op. invalidated = None if isinstance(result, tuple) and len(result) == 2: invalidated = result[1] elif isinstance(result, dict): invalidated = result.get("invalidated") if invalidated: print(f"cascade invalidated {len(invalidated)} downstream claim(s):") for cid in invalidated: print(f" - {cid}") return 0 def cmd_answer(args) -> int: """Narrated answer to: what do you know about me, and why?""" node = _load_node(args.path) claims = _list_claims(node, include_inactive=False) if args.query: q = args.query.lower() claims = [ d for d in claims if q in str(d.get("predicate", "")).lower() or q in str(d.get("object", d.get("value", ""))).lower() ] if not claims: print("I currently hold no active claims matching that.") return 0 print(f"I hold {len(claims)} active claim(s) about you:\n") for d in claims: conf = d.get("confidence") conf_s = f"{conf:.0%}" if isinstance(conf, (int, float)) else "unknown" obj = d.get("object", d.get("value", "")) print(f"* {d.get('predicate', '?')}: {obj} (confidence {conf_s})") evidence = d.get("evidence_ids") or [] derived = d.get("derived_from") or [] why = [] if evidence: why.append(f"{len(evidence)} piece(s) of evidence") if derived: why.append(f"derived from {len(derived)} upstream claim(s)") why_s = " and ".join(why) if why else "asserted directly" print(f" why: {why_s}; inspect with: explain {d.get('claim_id')}") print( "\nEvery claim above is a signed operation with provenance. " "Use `refute --reason ...` to correct me; corrections " "cascade to everything derived from the refuted claim." ) return 0 def cmd_grant(args) -> int: node = _load_node(args.path) predicates = [p.strip() for p in args.predicates.split(",") if p.strip()] if not predicates: raise _die("at least one predicate is required (claims-only slices are narrow by design)") try: op = node.grant(args.to, predicates, note=args.note or "") except TypeError: op = node.grant(args.to, predicates) grant_id = _field(op, "op_id", "grant_id", "id") if args.json: print(json.dumps({"grant_id": grant_id, "delegate": args.to, "predicates": predicates})) else: print(f"granted claims-only slice to {args.to}") print(f"predicates: {', '.join(predicates)}") print(f"grant_id: {grant_id}") return 0 def cmd_grants(args) -> int: node = _load_node(args.path) items = node.grants() rows = [] for g in items: rows.append( { "grant_id": _field(g, "grant_id", "op_id", "id"), "delegate": _field(g, "delegate", "to", "grantee"), "predicates": _field(g, "predicates", default=[]), "status": _field(g, "status", default="active"), } ) if args.json: print(_json_dump(rows)) return 0 if not rows: print("(no grants)") return 0 for r in rows: print(f" [{r['status']:>7}] {r['grant_id']}") print(f" delegate: {r['delegate']}") print(f" predicates: {', '.join(r['predicates'])}") return 0 def cmd_revoke(args) -> int: node = _load_node(args.path) try: op = node.revoke(args.grant_id, reason=args.reason or "") except TypeError: op = node.revoke(args.grant_id) print(f"revoked grant {args.grant_id}") rid = _field(op, "op_id", "id") if rid: print(f"revocation op: {rid}") print( "deliver the revocation to the delegate with " f"`export --grant {args.grant_id}` followed by `import` on the delegate, " "or a normal `sync`." ) return 0 def cmd_export(args) -> int: node = _load_node(args.path) if args.grant: ops = node.export_bundle(grant_id=args.grant) else: ops = node.export_bundle() bundle = { "fablepool_bundle": 1, "node": _field(node, "node_id", "public_key"), "grant": args.grant, "ops": [_op_to_dict(o) for o in ops], } text = _json_dump(bundle) if args.output: Path(args.output).write_text(text + "\n", encoding="utf-8") print(f"wrote {len(bundle['ops'])} operation(s) to {args.output}") else: print(text) return 0 def cmd_import(args) -> int: node = _load_node(args.path) raw = json.loads(Path(args.bundle).read_text(encoding="utf-8")) ops = raw.get("ops", raw) if isinstance(raw, dict) else raw result = node.import_bundle(ops) applied = _field(result, "applied", "imported", "accepted") if applied is None and isinstance(result, int): applied = result print(f"imported bundle: {applied if applied is not None else 'ok'} operation(s) applied") skipped = _field(result, "skipped", "duplicates") if skipped: print(f"skipped (already known / not authorised): {skipped}") return 0 def cmd_sync(args) -> int: node = _load_node(args.path) from fablepool.sync import sync_with_peer result = sync_with_peer(node, args.peer) pushed = _field(result, "pushed", "sent", default="?") pulled = _field(result, "pulled", "received", default="?") print(f"sync with {args.peer}: pushed {pushed}, pulled {pulled}") return 0 def cmd_serve(args) -> int: node = _load_node(args.path) from fablepool.transport import serve_node print(f"serving node at http://{args.host}:{args.port} (Ctrl-C to stop)") serve_node(node, host=args.host, port=args.port) return 0 def cmd_log(args) -> int: node = _load_node(args.path) ops = [_op_to_dict(o) for o in _store_ops(node)] if args.json: print(_json_dump(ops)) return 0 for op in ops: op_id = op.get("op_id", op.get("id", "?")) op_type = op.get("op_type", op.get("type", "?")) author = str(op.get("author", "?")) print(f" {op_type:<12} {op_id} by {author[:16]}…") print(f"{len(ops)} operation(s)") return 0 def cmd_verify(args) -> int: node = _load_node(args.path) from fablepool.ops import verify_op ok, bad = 0, [] for op in _store_ops(node): try: valid = verify_op(op) except Exception as exc: # malformed op is a verification failure valid = False bad.append((_field(op, "op_id", "id", default="?"), str(exc))) if valid: ok += 1 else: bad.append((_field(op, "op_id", "id", default="?"), "signature invalid")) print(f"verified {ok} operation(s)") if bad: for op_id, why in bad: print(f" INVALID {op_id}: {why}", file=sys.stderr) return 1 print("log integrity: OK") return 0 # --------------------------------------------------------------------------- # Argument parsing # --------------------------------------------------------------------------- def _add_path(sp) -> None: sp.add_argument("--path", required=True, help="node directory") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="fablepool", description="FablePool — user-owned AI memory reference node", ) sub = parser.add_subparsers(dest="command", required=True) sp = sub.add_parser("init", help="create a new node") _add_path(sp) sp.add_argument("--name", required=True, help="human-readable node name") sp.set_defaults(func=cmd_init) sp = sub.add_parser("id", help="print this node's public identity") _add_path(sp) sp.set_defaults(func=cmd_id) sp = sub.add_parser("ingest", help="import evidence from a source") _add_path(sp) sp.add_argument("--kind", required=True, choices=("calendar", "notes", "photos")) sp.add_argument("source", help="ICS file, notes directory, or photo-metadata JSON") sp.set_defaults(func=cmd_ingest) sp = sub.add_parser("derive", help="run the derivation engine") _add_path(sp) sp.set_defaults(func=cmd_derive) sp = sub.add_parser("claims", help="list claims") _add_path(sp) sp.add_argument("--all", action="store_true", help="include refuted/invalidated claims") sp.add_argument("--json", action="store_true", help="machine-readable output") sp.set_defaults(func=cmd_claims) sp = sub.add_parser("explain", help="show provenance for a claim") _add_path(sp) sp.add_argument("claim_id") sp.set_defaults(func=cmd_explain) sp = sub.add_parser("refute", help="refute a claim (cascades downstream)") _add_path(sp) sp.add_argument("claim_id") sp.add_argument("--reason", required=True, help="why this claim is wrong") sp.set_defaults(func=cmd_refute) sp = sub.add_parser("answer", help='"what do you know about me and why?"') _add_path(sp) sp.add_argument("query", nargs="?", default=None, help="optional keyword filter") sp.set_defaults(func=cmd_answer) sp = sub.add_parser("grant", help="grant a delegate a claims-only slice") _add_path(sp) sp.add_argument("--to", required=True, help="delegate node public key") sp.add_argument("--predicates", required=True, help="comma-separated predicate list") sp.add_argument("--note", default=None) sp.add_argument("--json", action="store_true") sp.set_defaults(func=cmd_grant) sp = sub.add_parser("grants", help="list grants") _add_path(sp) sp.add_argument("--json", action="store_true") sp.set_defaults(func=cmd_grants) sp = sub.add_parser("revoke", help="revoke a grant") _add_path(sp) sp.add_argument("grant_id") sp.add_argument("--reason", default=None) sp.set_defaults(func=cmd_revoke) sp = sub.add_parser("export", help="export a signed operation bundle") _add_path(sp) sp.add_argument("--grant", default=None, help="export only the slice for this grant") sp.add_argument("-o", "--output", default=None, help="output file (default: stdout)") sp.set_defaults(func=cmd_export) sp = sub.add_parser("import", help="import a bundle from another node") _add_path(sp) sp.add_argument("bundle", help="bundle JSON file") sp.set_defaults(func=cmd_import) sp = sub.add_parser("sync", help="two-way sync with a peer over HTTP") _add_path(sp) sp.add_argument("--peer", required=True, help="peer base URL, e.g. http://127.0.0.1:7368") sp.set_defaults(func=cmd_sync) sp = sub.add_parser("serve", help="serve this node's sync endpoint") _add_path(sp) sp.add_argument("--host", default="127.0.0.1") sp.add_argument("--port", type=int, default=7368) sp.set_defaults(func=cmd_serve) sp = sub.add_parser("log", help="dump the raw operation log") _add_path(sp) sp.add_argument("--json", action="store_true") sp.set_defaults(func=cmd_log) sp = sub.add_parser("verify", help="verify all signatures in the local log") _add_path(sp) sp.set_defaults(func=cmd_verify) return parser def main(argv=None) -> int: parser = build_parser() args = parser.parse_args(argv) try: return args.func(args) except SystemExit: raise except KeyboardInterrupt: return 130 except Exception as exc: print(f"error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())