"""Command-line interface for the PMP reference local node. Commands -------- pmp init Create a new node directory with keys and an empty op log. pmp adapters List available import adapters. pmp import Run an adapter over one or more sources and append evidence ops. pmp log list List operations in the local log. pmp log show Show one operation (by id or unique id prefix) in full. pmp log verify Verify hash chaining and signatures of the entire log. pmp export Export the full signed log as a portable JSONL bundle. The node directory defaults to ``$PMP_NODE_DIR`` or ``~/.pmp/node`` and can be overridden per-command with ``--node-dir``. """ from __future__ import annotations import argparse import dataclasses import datetime import json import os import sys from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple from pmp.errors import PMPError from pmp.node import Node from pmp.adapters import list_adapters try: # pragma: no cover - trivial fallback from pmp import __version__ as _VERSION except Exception: # pragma: no cover _VERSION = "0.0.0" # --------------------------------------------------------------------------- # Small helpers # --------------------------------------------------------------------------- def _default_node_dir() -> str: env = os.environ.get("PMP_NODE_DIR") if env: return env return str(Path.home() / ".pmp" / "node") def _short(value: Any, length: int = 14) -> str: s = str(value or "") if len(s) <= length: return s return s[: length - 2] + ".." def _op_to_dict(op: Any) -> Dict[str, Any]: """Convert an Operation (or already-plain dict) to a plain dict.""" if isinstance(op, dict): return op to_dict = getattr(op, "to_dict", None) if callable(to_dict): return to_dict() if dataclasses.is_dataclass(op): return dataclasses.asdict(op) return dict(getattr(op, "__dict__", {})) def _iter_log_ops(node: Node) -> List[Dict[str, Any]]: """Return all operations in the node's log as plain dicts, in order.""" log = node.log return [_op_to_dict(op) for op in log] def _stat(stats: Any, key: str) -> int: """Read a counter off an ImportStats object or dict, tolerating absence.""" if stats is None: return 0 if isinstance(stats, dict): try: return int(stats.get(key, 0) or 0) except (TypeError, ValueError): return 0 try: return int(getattr(stats, key, 0) or 0) except (TypeError, ValueError): return 0 def _summarize_body(op_type: str, body: Any) -> str: """One-line, best-effort human summary of an operation body.""" if not isinstance(body, dict): return _short(body, 48) evidence = body.get("evidence") if isinstance(body.get("evidence"), dict) else body parts: List[str] = [] for key in ("source_type", "adapter", "source_id", "content_type", "title", "summary"): val = evidence.get(key) if val: parts.append(f"{key}={_short(val, 28)}") if len(parts) >= 3: break if not parts: # Fall back to the first couple of scalar fields. for key, val in body.items(): if isinstance(val, (str, int, float, bool)): parts.append(f"{key}={_short(val, 28)}") if len(parts) >= 2: break return " ".join(parts) def _print_table(rows: List[Tuple[str, ...]], headers: Tuple[str, ...]) -> None: widths = [len(h) for h in headers] for row in rows: for i, cell in enumerate(row): widths[i] = max(widths[i], len(cell)) fmt = " ".join("{:<" + str(w) + "}" for w in widths) print(fmt.format(*headers)) print(fmt.format(*("-" * w for w in widths))) for row in rows: print(fmt.format(*row)) def _open_node(args: argparse.Namespace) -> Node: node_dir = Path(args.node_dir) if not node_dir.exists(): raise PMPError( f"node directory {node_dir} does not exist; run 'pmp init --node-dir {node_dir}' first" ) return Node.open(node_dir) # --------------------------------------------------------------------------- # Command implementations # --------------------------------------------------------------------------- def cmd_init(args: argparse.Namespace) -> int: node_dir = Path(args.node_dir) node = Node.init(node_dir, name=args.name) node_id = getattr(node, "node_id", None) pubkey = None keys = getattr(node, "keys", None) if keys is not None: pubkey = getattr(keys, "public_key_b64", None) or getattr(keys, "public_key", None) print(f"Initialized PMP node at {node_dir}") if args.name: print(f" name: {args.name}") if node_id: print(f" node id: {node_id}") if pubkey: print(f" public key: {pubkey}") print("Next steps:") print(f" pmp adapters") print(f" pmp import --node-dir {node_dir} --adapter ...") return 0 def cmd_adapters(args: argparse.Namespace) -> int: registry = list_adapters() if isinstance(registry, dict): items: Iterable[Tuple[str, Any]] = sorted(registry.items()) else: items = sorted((getattr(a, "name", str(a)), a) for a in registry) rows: List[Tuple[str, ...]] = [] payload: List[Dict[str, Any]] = [] for name, adapter in items: desc = getattr(adapter, "description", None) if not desc: doc = getattr(adapter, "__doc__", "") or "" desc = doc.strip().splitlines()[0] if doc.strip() else "" rows.append((str(name), _short(desc, 70))) payload.append({"name": str(name), "description": str(desc)}) if args.json: print(json.dumps(payload, indent=2)) else: if not rows: print("No adapters registered.") else: _print_table(rows, ("ADAPTER", "DESCRIPTION")) return 0 def cmd_import(args: argparse.Namespace) -> int: node = _open_node(args) totals = {"imported": 0, "duplicates": 0, "superseded": 0, "errors": 0} exit_code = 0 for source in args.sources: source_path = Path(source) if not source_path.exists(): print(f"error: source not found: {source_path}", file=sys.stderr) totals["errors"] += 1 exit_code = 1 continue try: stats = node.import_path(args.adapter, source_path) except PMPError as exc: print(f"error: import of {source_path} failed: {exc}", file=sys.stderr) totals["errors"] += 1 exit_code = 1 continue imported = _stat(stats, "imported") duplicates = _stat(stats, "duplicates") superseded = _stat(stats, "superseded") totals["imported"] += imported totals["duplicates"] += duplicates totals["superseded"] += superseded print( f"{source_path}: {imported} imported, " f"{duplicates} duplicate(s) skipped, {superseded} superseded" ) print( f"total: {totals['imported']} imported, {totals['duplicates']} duplicate(s) skipped, " f"{totals['superseded']} superseded, {totals['errors']} error(s)" ) return exit_code def cmd_log_list(args: argparse.Namespace) -> int: node = _open_node(args) ops = _iter_log_ops(node) if args.type: ops = [op for op in ops if str(op.get("op_type", op.get("type", ""))) == args.type] if args.limit is not None and args.limit >= 0: ops = ops[-args.limit :] if not args.head else ops[: args.limit] if args.json: print(json.dumps(ops, indent=2, sort_keys=True)) return 0 if not ops: print("(log is empty or no operations match the filter)") return 0 rows: List[Tuple[str, ...]] = [] for op in ops: op_type = str(op.get("op_type", op.get("type", ""))) rows.append( ( str(op.get("seq", "")), _short(op.get("op_id", op.get("id", "")), 16), op_type, _short(op.get("timestamp", op.get("created_at", "")), 20), _short(op.get("author", ""), 14), _summarize_body(op_type, op.get("body", {})), ) ) _print_table(rows, ("SEQ", "OP_ID", "TYPE", "TIMESTAMP", "AUTHOR", "SUMMARY")) print(f"\n{len(rows)} operation(s) shown.") return 0 def cmd_log_show(args: argparse.Namespace) -> int: node = _open_node(args) ops = _iter_log_ops(node) wanted = args.op_id matches = [ op for op in ops if str(op.get("op_id", op.get("id", ""))).startswith(wanted) ] if not matches: print(f"error: no operation with id (or prefix) {wanted!r}", file=sys.stderr) return 1 if len(matches) > 1 and not any( str(op.get("op_id", op.get("id", ""))) == wanted for op in matches ): print( f"error: id prefix {wanted!r} is ambiguous ({len(matches)} matches); " "use a longer prefix", file=sys.stderr, ) return 1 exact = [op for op in matches if str(op.get("op_id", op.get("id", ""))) == wanted] op = exact[0] if exact else matches[0] print(json.dumps(op, indent=2, sort_keys=True)) return 0 def cmd_log_verify(args: argparse.Namespace) -> int: node = _open_node(args) try: result = node.log.verify() except PMPError as exc: print(f"VERIFICATION FAILED: {exc}", file=sys.stderr) return 1 # Tolerate verify() implementations that return a problem list/report # instead of raising. problems: List[str] = [] if isinstance(result, (list, tuple)): problems = [str(p) for p in result] elif result not in (None, True) and getattr(result, "ok", True) is False: problems = [str(p) for p in getattr(result, "problems", ["verification failed"])] if problems: for p in problems: print(f"VERIFICATION FAILED: {p}", file=sys.stderr) return 1 count = len(_iter_log_ops(node)) print(f"OK: {count} operation(s); hash chain and signatures verified.") return 0 def cmd_export(args: argparse.Namespace) -> int: node = _open_node(args) ops = _iter_log_ops(node) header = { "pmp_export": 1, "format": "pmp-oplog-jsonl", "node_id": getattr(node, "node_id", None), "exported_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), "op_count": len(ops), } out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as fh: fh.write(json.dumps(header, sort_keys=True) + "\n") for op in ops: fh.write(json.dumps(op, sort_keys=True) + "\n") print(f"Exported {len(ops)} operation(s) to {out_path}") return 0 # --------------------------------------------------------------------------- # Parser wiring # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="pmp", description="PMP reference local node: user-owned, signed, append-only AI memory log.", ) parser.add_argument("--version", action="version", version=f"pmp {_VERSION}") def add_node_dir(p: argparse.ArgumentParser) -> None: p.add_argument( "--node-dir", default=_default_node_dir(), help="node directory (default: $PMP_NODE_DIR or ~/.pmp/node)", ) sub = parser.add_subparsers(dest="command", required=True) p_init = sub.add_parser("init", help="create a new node (keys + empty log)") add_node_dir(p_init) p_init.add_argument("--name", default=None, help="human-readable node name") p_init.set_defaults(func=cmd_init) p_adapters = sub.add_parser("adapters", help="list available import adapters") p_adapters.add_argument("--json", action="store_true", help="emit JSON") p_adapters.set_defaults(func=cmd_adapters) p_import = sub.add_parser("import", help="import sources via an adapter") add_node_dir(p_import) p_import.add_argument("--adapter", required=True, help="adapter name (see 'pmp adapters')") p_import.add_argument("sources", nargs="+", help="one or more source files/directories") p_import.set_defaults(func=cmd_import) p_log = sub.add_parser("log", help="inspect the local operation log") log_sub = p_log.add_subparsers(dest="log_command", required=True) p_list = log_sub.add_parser("list", help="list operations") add_node_dir(p_list) p_list.add_argument("--type", default=None, help="filter by op_type") p_list.add_argument("--limit", type=int, default=None, help="show at most N operations") p_list.add_argument( "--head", action="store_true", help="with --limit, take from the start instead of the end" ) p_list.add_argument("--json", action="store_true", help="emit JSON") p_list.set_defaults(func=cmd_log_list) p_show = log_sub.add_parser("show", help="show one operation in full") add_node_dir(p_show) p_show.add_argument("op_id", help="operation id or unique prefix") p_show.set_defaults(func=cmd_log_show) p_verify = log_sub.add_parser("verify", help="verify the whole log") add_node_dir(p_verify) p_verify.set_defaults(func=cmd_log_verify) p_export = sub.add_parser("export", help="export the signed log as a JSONL bundle") add_node_dir(p_export) p_export.add_argument("--out", required=True, help="output file path") p_export.set_defaults(func=cmd_export) return parser def main(argv: Optional[List[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: return int(args.func(args)) except PMPError as exc: print(f"error: {exc}", file=sys.stderr) return 2 except KeyboardInterrupt: # pragma: no cover return 130 if __name__ == "__main__": # pragma: no cover sys.exit(main())