"""mnema-derive: command-line interface wiring the derivation engine to the signed operation log of a local node. The log is the only durable state. Every command rebuilds engine state by replaying the log, performs its action, and (for mutating commands) appends the resulting signed operations. Usage:: python -m mnema.derive.cli init --node ./mynode python -m mnema.derive.cli ingest --node ./mynode data/samples/*.jsonl python -m mnema.derive.cli run --node ./mynode python -m mnema.derive.cli claims --node ./mynode python -m mnema.derive.cli explain --node ./mynode python -m mnema.derive.cli refute --node ./mynode --reason "..." python -m mnema.derive.cli correct --node ./mynode --value '"tea"' --reason "..." python -m mnema.derive.cli retract --node ./mynode --reason "..." python -m mnema.derive.cli log --node ./mynode python -m mnema.derive.cli verify --node ./mynode """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any, List, Optional, Tuple from mnema.core.keys import KeyPair from mnema.core.log import OperationLog from mnema.derive.engine import CascadeReport, DerivationEngine, RunReport from mnema.derive.loaders import load_evidence_files from mnema.derive.model import ClaimStatus from mnema.derive.sink import LogOpSink KEY_FILE = "node.key" LOG_FILE = "ops.log" def builtin_derivers() -> List[Any]: from mnema.derive.derivers.places import PlaceDeriver from mnema.derive.derivers.preferences import PreferenceDeriver from mnema.derive.derivers.relationships import RelationshipDeriver from mnema.derive.derivers.routines import RoutineDeriver return [RoutineDeriver(), PlaceDeriver(), RelationshipDeriver(), PreferenceDeriver()] # ---------------------------------------------------------------------- # node plumbing # ---------------------------------------------------------------------- def _node_dir(args: argparse.Namespace) -> Path: return Path(args.node) def _open_node(args: argparse.Namespace) -> Tuple[KeyPair, OperationLog]: root = _node_dir(args) key_path = root / KEY_FILE log_path = root / LOG_FILE if not key_path.exists(): raise SystemExit( "no node at %s (missing %s) — run `init` first" % (root, KEY_FILE) ) keypair = KeyPair.load(key_path) log = OperationLog.open(log_path) return keypair, log def _build_engine(log: OperationLog, sink: Optional[LogOpSink]) -> DerivationEngine: engine = DerivationEngine(derivers=builtin_derivers(), sink=sink) engine.replay(log) return engine def _print_json(obj: Any) -> None: print(json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False)) # ---------------------------------------------------------------------- # commands # ---------------------------------------------------------------------- def cmd_init(args: argparse.Namespace) -> int: root = _node_dir(args) root.mkdir(parents=True, exist_ok=True) key_path = root / KEY_FILE if key_path.exists(): print("node already initialized at %s" % root) return 0 keypair = KeyPair.generate() keypair.save(key_path) OperationLog.open(root / LOG_FILE) # creates an empty log print("initialized node at %s" % root) print("node key id: %s" % keypair.key_id) return 0 def cmd_ingest(args: argparse.Namespace) -> int: keypair, log = _open_node(args) engine = _build_engine(log, LogOpSink(log, keypair)) evidence = load_evidence_files([Path(p) for p in args.files]) added = engine.ingest_all(evidence) skipped = len(evidence) - added print("ingested %d evidence record(s) (%d already known)" % (added, skipped)) return 0 def cmd_run(args: argparse.Namespace) -> int: keypair, log = _open_node(args) engine = _build_engine(log, LogOpSink(log, keypair)) report: RunReport = engine.run() if args.json: _print_json(report.to_dict()) return 0 print( "derivation complete: %d pass(es), %d new claim(s), %d superseded, %d suppressed identity(ies)" % ( report.passes, len(report.asserted), len(report.superseded), len(report.suppressed), ) ) for claim_id in report.asserted: claim = engine.get_claim(claim_id) if claim is not None: print( " + %s %s = %s (%.2f)" % (claim_id, claim.predicate, json.dumps(claim.value, ensure_ascii=False), claim.confidence) ) return 0 def cmd_claims(args: argparse.Namespace) -> int: _, log = _open_node(args) engine = _build_engine(log, None) claims = engine.claims(include_inactive=args.all) if args.predicate: claims = [c for c in claims if c.predicate == args.predicate] claims.sort(key=lambda c: (c.predicate, -c.confidence, c.claim_id)) if args.json: _print_json([DerivationEngine._claim_to_payload(c) for c in claims]) return 0 if not claims: print("no claims" if not args.all else "no claims at all") return 0 for claim in claims: status = "" if claim.status == ClaimStatus.ACTIVE else " [%s]" % claim.status.value print( "%s conf=%.2f %s = %s%s (by %s)" % ( claim.claim_id, claim.confidence, claim.predicate, json.dumps(claim.value, ensure_ascii=False), status, claim.deriver, ) ) return 0 def cmd_show(args: argparse.Namespace) -> int: _, log = _open_node(args) engine = _build_engine(log, None) claim = engine.get_claim(args.claim_id) if claim is None: raise SystemExit("unknown claim: %s" % args.claim_id) _print_json(DerivationEngine._claim_to_payload(claim)) return 0 def cmd_explain(args: argparse.Namespace) -> int: _, log = _open_node(args) engine = _build_engine(log, None) try: explanation = engine.explanation_for(args.claim_id) except KeyError: raise SystemExit("unknown claim: %s" % args.claim_id) if args.json: _print_json(explanation) return 0 print("Claim %s" % explanation["claim_id"]) print( " %s = %s (confidence %.2f, status %s)" % ( explanation["predicate"], json.dumps(explanation["value"], ensure_ascii=False), explanation["confidence"], explanation["status"], ) ) print(" Derived by %s via rule %r" % (explanation["deriver"], explanation["rule"])) if explanation["parameters"]: print(" Parameters: %s" % json.dumps(explanation["parameters"], sort_keys=True)) if explanation["rationale"]: print(" Because: %s" % explanation["rationale"]) print(" Inputs (%d):" % len(explanation["inputs"])) for item in explanation["inputs"]: if item["type"] == "evidence": flag = " (RETRACTED)" if item.get("retracted") else "" print( " - [evidence] %s %s %s \"%s\"%s" % (item["id"], item["kind"], item["observed_at"], item["summary"], flag) ) elif item["type"] == "claim": print( " - [claim] %s %s = %s (%.2f, %s)" % ( item["id"], item["predicate"], json.dumps(item["value"], ensure_ascii=False), item["confidence"], item["status"], ) ) else: print(" - [unknown] %s" % item["id"]) return 0 def _print_cascade(report: CascadeReport, engine: DerivationEngine, as_json: bool) -> None: if as_json: _print_json(report.to_dict()) return print("%s %s: %s" % (report.action, report.target_id, report.reason)) if report.replacement_id: print(" corrected to: %s" % report.replacement_id) print(" invalidated downstream: %d" % len(report.invalidated)) for claim_id in report.invalidated: claim = engine.get_claim(claim_id) label = claim.predicate if claim is not None else "?" print(" - %s (%s)" % (claim_id, label)) print(" re-derived: %d" % len(report.rederived)) for claim_id in report.rederived: claim = engine.get_claim(claim_id) if claim is not None: print( " + %s %s = %s (%.2f)" % (claim_id, claim.predicate, json.dumps(claim.value, ensure_ascii=False), claim.confidence) ) if report.suppressed: print(" suppressed identities: %s" % ", ".join(sorted(set(report.suppressed)))) def cmd_refute(args: argparse.Namespace) -> int: keypair, log = _open_node(args) engine = _build_engine(log, LogOpSink(log, keypair)) try: report = engine.refute(args.claim_id, args.reason) except (KeyError, ValueError) as exc: raise SystemExit(str(exc)) _print_cascade(report, engine, args.json) return 0 def cmd_correct(args: argparse.Namespace) -> int: keypair, log = _open_node(args) engine = _build_engine(log, LogOpSink(log, keypair)) try: value = json.loads(args.value) except json.JSONDecodeError: value = args.value # treat as a plain string try: report = engine.correct(args.claim_id, value, args.reason) except (KeyError, ValueError) as exc: raise SystemExit(str(exc)) _print_cascade(report, engine, args.json) return 0 def cmd_retract(args: argparse.Namespace) -> int: keypair, log = _open_node(args) engine = _build_engine(log, LogOpSink(log, keypair)) try: report = engine.retract_evidence(args.evidence_id, args.reason) except (KeyError, ValueError) as exc: raise SystemExit(str(exc)) _print_cascade(report, engine, args.json) return 0 def cmd_evidence(args: argparse.Namespace) -> int: _, log = _open_node(args) engine = _build_engine(log, None) records = sorted(engine.graph.evidence(), key=lambda e: (e.observed_at, e.evidence_id)) for record in records: flag = " [RETRACTED]" if engine.is_retracted(record.evidence_id) else "" print("%s %s %s %s%s" % (record.evidence_id, record.kind, record.observed_at, record.source, flag)) print("%d evidence record(s)" % len(records)) return 0 def cmd_log(args: argparse.Namespace) -> int: _, log = _open_node(args) count = 0 for op in log: count += 1 detail = "" payload = op.payload if isinstance(payload, dict): if "claim" in payload and isinstance(payload["claim"], dict): detail = payload["claim"].get("claim_id", "") elif "claim_id" in payload: detail = payload["claim_id"] elif "evidence_id" in payload: detail = payload["evidence_id"] print("%s %-18s %s" % (op.op_id, op.kind, detail)) print("%d operation(s)" % count) return 0 def cmd_verify(args: argparse.Namespace) -> int: _, log = _open_node(args) log.verify() print("log verified: all signatures and hash links valid") return 0 # ---------------------------------------------------------------------- # argument parsing # ---------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="mnema-derive", description="Derivation engine CLI for a mnema local node.", ) parser.add_argument( "--node", default=".mnema", help="node directory containing the key and operation log (default: .mnema)", ) sub = parser.add_subparsers(dest="command", required=True) p = sub.add_parser("init", help="create a node key and empty operation log") p.set_defaults(func=cmd_init) p = sub.add_parser("ingest", help="ingest evidence record files (JSONL)") p.add_argument("files", nargs="+", help="evidence .jsonl files") p.set_defaults(func=cmd_ingest) p = sub.add_parser("run", help="run derivers to a fixpoint, appending claim ops") p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_run) p = sub.add_parser("claims", help="list claims") p.add_argument("--all", action="store_true", help="include superseded/refuted/invalidated") p.add_argument("--predicate", help="filter by predicate") p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_claims) p = sub.add_parser("show", help="show a single claim as JSON") p.add_argument("claim_id") p.set_defaults(func=cmd_show) p = sub.add_parser("explain", help="why does the system believe this claim?") p.add_argument("claim_id") p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_explain) p = sub.add_parser("refute", help="mark a claim as wrong; cascade-invalidate dependents") p.add_argument("claim_id") p.add_argument("--reason", required=True) p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_refute) p = sub.add_parser("correct", help="replace a claim's value with a user assertion") p.add_argument("claim_id") p.add_argument("--value", required=True, help="corrected value as JSON (or raw string)") p.add_argument("--reason", required=True) p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_correct) p = sub.add_parser("retract", help="withdraw a piece of evidence; cascade-invalidate") p.add_argument("evidence_id") p.add_argument("--reason", required=True) p.add_argument("--json", action="store_true") p.set_defaults(func=cmd_retract) p = sub.add_parser("evidence", help="list ingested evidence") p.set_defaults(func=cmd_evidence) p = sub.add_parser("log", help="list operations in the signed log") p.set_defaults(func=cmd_log) p = sub.add_parser("verify", help="verify log signatures and hash links") p.set_defaults(func=cmd_verify) return parser def main(argv: Optional[List[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": sys.exit(main())