"""LocalNode: keystore + operation log + adapters, tied together. Directory layout of a node:: / node.json # public node config (no secrets) keys/ # keystore (private keys, mode 0600) log/ops.jsonl # the append-only signed operation log The node owns the import pipeline: 1. an adapter parses a source into :class:`EvidenceItem` values; 2. the node deduplicates by content hash and supersedes by external id; 3. each new item becomes a signed ``evidence.add`` operation appended to the log, carrying full provenance (adapter, version, origin URI, import time, source-observed time). Dedup/supersession semantics: * same ``(adapter, external_id)`` with the same content hash -> skipped as a duplicate (re-importing a source is idempotent); * same ``(adapter, external_id)`` with a different content hash -> a new op whose body ``supersedes`` the previous op_id (the old evidence remains in the log -- nothing is ever deleted -- but readers treat it as no longer current); * no external id -> dedup purely by content hash. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Iterator, Optional from pmp.adapters import get_adapter from pmp.canonical import canonical_hash from pmp.errors import AdapterError, NodeError, PMPError from pmp.keys import KeyPair, KeyStore from pmp.operations import ( OP_TYPE_EVIDENCE_ADD, OP_TYPE_NODE_HELLO, PROTOCOL_VERSION, make_evidence_body, make_op, now_iso, sign_op, ) from pmp.oplog import OpLog CONFIG_FILENAME = "node.json" KEYS_DIRNAME = "keys" LOG_DIRNAME = "log" NODE_FORMAT_VERSION = 1 @dataclass class ImportResult: """Outcome of one ``import_source`` call.""" adapter: str source: str added: list[str] = field(default_factory=list) # op_ids appended superseded: list[str] = field(default_factory=list) # op_ids replaced duplicates: int = 0 # unchanged items skipped errors: list[str] = field(default_factory=list) def as_dict(self) -> dict: return { "adapter": self.adapter, "source": self.source, "added": self.added, "superseded": self.superseded, "duplicates": self.duplicates, "errors": self.errors, } def summary(self) -> str: parts = [f"{len(self.added)} added", f"{self.duplicates} duplicate(s) skipped"] if self.superseded: parts.append(f"{len(self.superseded)} superseded") if self.errors: parts.append(f"{len(self.errors)} error(s)") return f"[{self.adapter}] {self.source}: " + ", ".join(parts) class LocalNode: def __init__(self, root: Path, config: dict, keypair: KeyPair, log: OpLog): self.root = Path(root) self.config = config self.keypair = keypair self.log = log # external key -> (op_id, content_hash) of the *current* evidence op self._evidence_index: dict[str, tuple[str, str]] = {} self._build_evidence_index() # -- lifecycle ----------------------------------------------------------- @classmethod def init(cls, root: Path | str, name: str = "device") -> "LocalNode": """Create a new node directory: keypair, empty log, genesis op.""" root = Path(root) config_path = root / CONFIG_FILENAME if config_path.exists(): raise NodeError(f"node already initialized at {root}") root.mkdir(parents=True, exist_ok=True) keystore = KeyStore(root / KEYS_DIRNAME) keypair = keystore.create(name) log = OpLog(root / LOG_DIRNAME) hello = make_op( OP_TYPE_NODE_HELLO, keypair.key_id, { "kind": "node", "node_name": name, "protocol_version": PROTOCOL_VERSION, }, prev=None, seq=0, ) log.append(sign_op(hello, keypair)) config = { "v": NODE_FORMAT_VERSION, "node_name": name, "node_id": keypair.key_id, "key_name": name, "protocol_version": PROTOCOL_VERSION, "created_at": now_iso(), } config_path.write_text(json.dumps(config, indent=2) + "\n", encoding="utf-8") return cls(root, config, keypair, log) @classmethod def open(cls, root: Path | str) -> "LocalNode": """Open an existing node, fully verifying the log in the process.""" root = Path(root) config_path = root / CONFIG_FILENAME if not config_path.exists(): raise NodeError(f"no node at {root} (missing {CONFIG_FILENAME})") try: config = json.loads(config_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: raise NodeError(f"unreadable node config at {config_path}: {exc}") from exc if config.get("v") != NODE_FORMAT_VERSION: raise NodeError(f"unsupported node format version {config.get('v')!r}") keystore = KeyStore(root / KEYS_DIRNAME) keypair = keystore.load(config.get("key_name", "device")) if keypair.key_id != config.get("node_id"): raise NodeError("node_id in config does not match keystore key") log = OpLog(root / LOG_DIRNAME) return cls(root, config, keypair, log) # -- identity ------------------------------------------------------------ @property def node_id(self) -> str: return self.keypair.key_id @property def node_name(self) -> str: return self.config.get("node_name", "device") # -- evidence index ------------------------------------------------------ @staticmethod def _evidence_key(body: dict) -> str: external_id = body.get("external_id") if external_id: return f"{body['source']['adapter']}:{external_id}" return f"hash:{body['content_hash']}" def _build_evidence_index(self) -> None: self._evidence_index.clear() for op in self.log.iter_type(OP_TYPE_EVIDENCE_ADD): body = op["body"] # Later ops naturally overwrite earlier ones (log order = time order # within a single node), which is exactly supersession semantics. self._evidence_index[self._evidence_key(body)] = ( op["op_id"], body["content_hash"], ) # -- import pipeline ----------------------------------------------------- def import_source( self, adapter_name: str, source: Path | str, origin: Optional[str] = None, ) -> ImportResult: """Run one adapter over *source* and append the resulting evidence.""" adapter = get_adapter(adapter_name) source = Path(source) result = ImportResult(adapter=adapter.name, source=str(source)) try: default_origin = origin or source.resolve().as_uri() except ValueError: default_origin = origin or str(source) try: items = list(adapter.parse(source)) except AdapterError as exc: result.errors.append(str(exc)) return result for item in items: try: content_hash = canonical_hash(item.content) except PMPError as exc: result.errors.append( f"item {item.external_id or ''}: {exc}" ) continue if item.external_id: key = f"{adapter.name}:{item.external_id}" else: key = f"hash:{content_hash}" existing = self._evidence_index.get(key) if existing and existing[1] == content_hash: result.duplicates += 1 continue supersedes = [existing[0]] if existing else None body = make_evidence_body( adapter=adapter.name, adapter_version=adapter.version, origin=item.origin or default_origin, content_type=item.content_type, content=item.content, external_id=item.external_id, observed_at=item.observed_at, supersedes=supersedes, ) op = make_op( OP_TYPE_EVIDENCE_ADD, self.keypair.key_id, body, prev=self.log.head_id, seq=self.log.next_seq, ) signed = sign_op(op, self.keypair) self.log.append(signed) self._evidence_index[key] = (signed["op_id"], content_hash) result.added.append(signed["op_id"]) if existing: result.superseded.append(existing[0]) return result # -- reading ------------------------------------------------------------- def superseded_op_ids(self) -> set[str]: out: set[str] = set() for op in self.log.iter_type(OP_TYPE_EVIDENCE_ADD): out.update(op["body"].get("supersedes", [])) return out def list_evidence( self, content_type: Optional[str] = None, include_superseded: bool = False, ) -> Iterator[dict]: """Yield evidence ops, by default only the current (non-superseded) ones.""" dead = set() if include_superseded else self.superseded_op_ids() for op in self.log.iter_type(OP_TYPE_EVIDENCE_ADD): if op["op_id"] in dead: continue if content_type and op["body"]["content_type"] != content_type: continue yield op def get_op(self, op_id: str) -> Optional[dict]: return self.log.get(op_id) def stats(self) -> dict: by_type: dict[str, int] = {} by_content_type: dict[str, int] = {} for op in self.log: by_type[op["type"]] = by_type.get(op["type"], 0) + 1 if op["type"] == OP_TYPE_EVIDENCE_ADD: ct = op["body"]["content_type"] by_content_type[ct] = by_content_type.get(ct, 0) + 1 return { "node_id": self.node_id, "node_name": self.node_name, "ops": len(self.log), "head": self.log.head_id, "by_type": by_type, "evidence_by_content_type": by_content_type, "superseded": len(self.superseded_op_ids()), } # -- audit & portability --------------------------------------------------- def verify(self) -> dict: """Full log verification (signatures, chain, canonical bytes).""" report = self.log.verify() report["node_id"] = self.node_id return report def export_log(self, dest: Path | str) -> dict: """Write the full log as portable JSONL plus a manifest sidecar. The export is byte-identical to the node's own log file (the log *is* the interchange format); the manifest pins the head op_id so a recipient can confirm they received the whole chain. """ dest = Path(dest) dest.parent.mkdir(parents=True, exist_ok=True) from pmp.canonical import canonical_json # local import avoids cycle risk with dest.open("wb") as fh: for op in self.log: fh.write(canonical_json(op) + b"\n") manifest = { "format": "pmp-oplog-jsonl", "format_version": 1, "node_id": self.node_id, "node_name": self.node_name, "ops": len(self.log), "head": self.log.head_id, "exported_at": now_iso(), } manifest_path = dest.with_name(dest.name + ".manifest.json") manifest_path.write_text( json.dumps(manifest, indent=2) + "\n", encoding="utf-8" ) manifest["file"] = str(dest) manifest["manifest_file"] = str(manifest_path) return manifest