"""NodeStore: a node directory (identity + operation log) materialized into queryable state, plus the mutation API the CLI uses. Every mutation appends a signed operation to the log and then applies it to the in-memory projection, so live state and replayed state are produced by the same ``_apply`` code path. Refutations and corrections cascade: any claim whose derivation graph transitively depends on a refuted or corrected claim is marked ``invalidated``, with the root cause recorded. """ from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from .errors import ( AmbiguousIdError, NotFoundError, StoreError, ) from .identity import Identity from .model import ( Capability, Claim, Evidence, InferenceRecord, STATUS_ACTIVE, STATUS_CORRECTED, STATUS_INVALIDATED, STATUS_REFUTED, ) from .oplog import OpLog from .wire import build_envelope, derived_id, now_ts IDENTITY_FILE = "identity.json" OPLOG_FILE = "oplog.jsonl" META_FILE = "node.json" GRAPH_EXPORT_FORMAT = "fablepool-graph/1" class NodeStore: """A single user-owned node: identity, log, and materialized state.""" def __init__(self, node_dir: Path, identity: Identity, oplog: OpLog): self.node_dir = node_dir self.identity = identity self.oplog = oplog self.ops: List[Dict[str, Any]] = [] self.ops_by_id: Dict[str, Dict[str, Any]] = {} self.evidence: Dict[str, Evidence] = {} self.claims: Dict[str, Claim] = {} self.capabilities: Dict[str, Capability] = {} self.inferences: List[InferenceRecord] = [] # input id (claim or evidence) -> ids of claims that list it in inputs self.dependents: Dict[str, Set[str]] = {} self.load_warnings: List[str] = [] self._heads: Dict[str, Optional[str]] = {} # ------------------------------------------------------------------ # opening / creating # ------------------------------------------------------------------ @classmethod def open( cls, node_dir: Union[str, Path], create: bool = False, label: str = "node", ) -> "NodeStore": node_dir = Path(node_dir) identity_path = node_dir / IDENTITY_FILE if identity_path.exists(): identity = Identity.load(identity_path) elif create: node_dir.mkdir(parents=True, exist_ok=True) identity = Identity.create(label) identity.save(identity_path) meta = {"label": label, "created_at": now_ts(), "format": "fablepool-node/1"} (node_dir / META_FILE).write_text( json.dumps(meta, indent=2) + "\n", encoding="utf-8" ) else: raise StoreError( f"{node_dir} is not an initialized node (no {IDENTITY_FILE}); " f"run 'fable init' first or pass --node pointing at a node dir" ) store = cls(node_dir, identity, OpLog(node_dir / OPLOG_FILE)) store._load() return store @property def label(self) -> str: meta = self.node_dir / META_FILE if meta.exists(): try: return json.loads(meta.read_text(encoding="utf-8")).get( "label", self.identity.label ) except ValueError: pass return self.identity.label def _load(self) -> None: for env in self.oplog.read_all(): self._apply(env, strict=False) # ------------------------------------------------------------------ # applying operations (shared by replay and live mutation) # ------------------------------------------------------------------ def _apply(self, env: Dict[str, Any], strict: bool) -> Any: """Apply one envelope to materialized state. In strict mode (live mutations) semantic violations raise StoreError. In replay mode they are recorded as load warnings and the op's effect is skipped, so one bad historical op cannot brick the node. """ handler = { "evidence.add": self._apply_evidence_add, "claim.assert": self._apply_claim_assert, "claim.refute": self._apply_claim_refute, "claim.correct": self._apply_claim_correct, "capability.grant": self._apply_capability_grant, "capability.revoke": self._apply_capability_revoke, "inference.call": self._apply_inference_call, }[env["type"]] try: result = handler(env) except StoreError as exc: if strict: raise self.load_warnings.append(f"{env['op_id']}: {exc}") result = None # bookkeeping happens even for skipped ops: the op exists in the log self.ops.append(env) self.ops_by_id[env["op_id"]] = env self._heads[env["author"]] = env["op_id"] return result def _apply_evidence_add(self, env: Dict[str, Any]) -> Evidence: b = env["body"] ev = Evidence( id=derived_id("ev", env["op_id"]), op_id=env["op_id"], source=b["source"], kind=b["kind"], content=b["content"], observed_at=b["observed_at"], tags=list(b.get("tags", [])), author=env["author"], ts=env["ts"], ) self.evidence[ev.id] = ev return ev def _apply_claim_assert(self, env: Dict[str, Any]) -> Claim: b = env["body"] missing = [i for i in b["inputs"] if i not in self.claims and i not in self.evidence] if missing: raise StoreError( f"claim references unknown inputs: {', '.join(missing)}" ) claim = Claim( id=derived_id("cl", env["op_id"]), op_id=env["op_id"], subject=b["subject"], topic=b["topic"], statement=b["statement"], value=b.get("value"), confidence=float(b["confidence"]), method=b["method"], inputs=list(b["inputs"]), params=dict(b.get("params", {})), author=env["author"], ts=env["ts"], ) # A claim asserted over an already-dead input is born invalidated. for input_id in claim.inputs: upstream = self.claims.get(input_id) if upstream is not None and not upstream.is_active: claim.status = STATUS_INVALIDATED claim.invalidated_by = ( upstream.refuted_by or upstream.invalidated_by or upstream.op_id ) claim.invalidated_via = upstream.id break self.claims[claim.id] = claim for input_id in claim.inputs: self.dependents.setdefault(input_id, set()).add(claim.id) return claim def _apply_claim_refute(self, env: Dict[str, Any]) -> List[str]: b = env["body"] claim = self.claims.get(b["claim_id"]) if claim is None: raise StoreError(f"cannot refute unknown claim {b['claim_id']}") if claim.status == STATUS_REFUTED: raise StoreError(f"claim {claim.id} is already refuted") if claim.status == STATUS_CORRECTED: raise StoreError( f"claim {claim.id} was corrected; refute its replacement " f"{claim.corrected_by} instead" ) claim.status = STATUS_REFUTED claim.refuted_by = env["op_id"] claim.refute_reason = b["reason"] affected = self._cascade_invalidate(claim.id, env["op_id"]) return affected def _apply_claim_correct(self, env: Dict[str, Any]) -> Tuple[Claim, List[str]]: b = env["body"] old = self.claims.get(b["claim_id"]) if old is None: raise StoreError(f"cannot correct unknown claim {b['claim_id']}") if old.status == STATUS_REFUTED: raise StoreError( f"claim {old.id} is refuted; assert a fresh claim instead of " f"correcting it" ) if old.status == STATUS_CORRECTED: raise StoreError( f"claim {old.id} was already corrected by {old.corrected_by}" ) replacement = Claim( id=derived_id("cl", env["op_id"]), op_id=env["op_id"], subject=old.subject, topic=old.topic, statement=b["statement"], value=b.get("value", old.value), confidence=1.0, method="user.correction", inputs=[], params={"corrects": old.id, "reason": b.get("reason")}, author=env["author"], ts=env["ts"], corrects=old.id, ) old.status = STATUS_CORRECTED old.corrected_by = replacement.id self.claims[replacement.id] = replacement affected = self._cascade_invalidate(old.id, env["op_id"]) return replacement, affected def _apply_capability_grant(self, env: Dict[str, Any]) -> Capability: b = env["body"] cap = Capability( id=derived_id("cap", env["op_id"]), op_id=env["op_id"], grantee=b["grantee"], topics=list(b["topics"]), expires_at=b.get("expires_at"), note=b.get("note"), author=env["author"], ts=env["ts"], ) self.capabilities[cap.id] = cap return cap def _apply_capability_revoke(self, env: Dict[str, Any]) -> Capability: b = env["body"] cap = self.capabilities.get(b["capability_id"]) if cap is None: raise StoreError(f"cannot revoke unknown capability {b['capability_id']}") if cap.status == "revoked": raise StoreError(f"capability {cap.id} is already revoked") cap.status = "revoked" cap.revoked_by = env["op_id"] cap.revoke_reason = b.get("reason") return cap def _apply_inference_call(self, env: Dict[str, Any]) -> InferenceRecord: b = env["body"] rec = InferenceRecord( op_id=env["op_id"], model=b["model"], purpose=b["purpose"], input_ids=list(b["input_ids"]), output_ids=list(b.get("output_ids", [])), prompt_hash=b.get("prompt_hash"), author=env["author"], ts=env["ts"], ) self.inferences.append(rec) return rec # ------------------------------------------------------------------ # cascade invalidation # ------------------------------------------------------------------ def _cascade_invalidate(self, root_claim_id: str, cause_op_id: str) -> List[str]: """Mark every active transitive dependent of *root_claim_id* as invalidated. Returns affected claim ids in BFS (cause-first) order.""" affected: List[str] = [] frontier = [root_claim_id] seen: Set[str] = {root_claim_id} while frontier: current = frontier.pop(0) for dep_id in sorted(self.dependents.get(current, ())): if dep_id in seen: continue seen.add(dep_id) dep = self.claims.get(dep_id) if dep is None: continue if dep.status == STATUS_ACTIVE: dep.status = STATUS_INVALIDATED dep.invalidated_by = cause_op_id dep.invalidated_via = current affected.append(dep_id) frontier.append(dep_id) return affected # ------------------------------------------------------------------ # mutation API (sign, append, apply) # ------------------------------------------------------------------ def _emit(self, op_type: str, body: Dict[str, Any]) -> Tuple[Dict[str, Any], Any]: author = self.identity.author env = build_envelope( op_type, body, author=author, prev=self._heads.get(author), sign_fn=self.identity.sign, ) # Validate semantics against current state *before* writing to disk # by doing a dry-run apply on the real state? No — instead handlers # raise StoreError in strict mode; we must validate before append. self._precheck(op_type, body) self.oplog.append(env) result = self._apply(env, strict=True) return env, result def _precheck(self, op_type: str, body: Dict[str, Any]) -> None: """Semantic prechecks so a rejected op never reaches the log.""" if op_type == "claim.assert": missing = [ i for i in body["inputs"] if i not in self.claims and i not in self.evidence ] if missing: raise StoreError( f"claim references unknown inputs: {', '.join(missing)}" ) elif op_type == "claim.refute": claim = self.claims.get(body["claim_id"]) if claim is None: raise StoreError(f"cannot refute unknown claim {body['claim_id']}") if claim.status == STATUS_REFUTED: raise StoreError(f"claim {claim.id} is already refuted") if claim.status == STATUS_CORRECTED: raise StoreError( f"claim {claim.id} was corrected; refute its replacement " f"{claim.corrected_by} instead" ) elif op_type == "claim.correct": claim = self.claims.get(body["claim_id"]) if claim is None: raise StoreError(f"cannot correct unknown claim {body['claim_id']}") if claim.status == STATUS_REFUTED: raise StoreError( f"claim {claim.id} is refuted; assert a fresh claim instead" ) if claim.status == STATUS_CORRECTED: raise StoreError( f"claim {claim.id} was already corrected by {claim.corrected_by}" ) elif op_type == "capability.revoke": cap = self.capabilities.get(body["capability_id"]) if cap is None: raise StoreError( f"cannot revoke unknown capability {body['capability_id']}" ) if cap.status == "revoked": raise StoreError(f"capability {cap.id} is already revoked") def add_evidence( self, source: str, kind: str, content: Dict[str, Any], observed_at: str, tags: Optional[List[str]] = None, ) -> Evidence: body: Dict[str, Any] = { "source": source, "kind": kind, "content": content, "observed_at": observed_at, } if tags: body["tags"] = list(tags) _, ev = self._emit("evidence.add", body) return ev def assert_claim( self, topic: str, statement: str, confidence: float, method: str, inputs: List[str], value: Any = None, params: Optional[Dict[str, Any]] = None, subject: str = "self", ) -> Claim: body: Dict[str, Any] = { "subject": subject, "topic": topic, "statement": statement, "confidence": confidence, "method": method, "inputs": list(inputs), } if value is not None: body["value"] = value if params: body["params"] = params _, claim = self._emit("claim.assert", body) return claim def refute_claim( self, claim_ref: str, reason: str, note: Optional[str] = None ) -> Tuple[Claim, List[str]]: claim = self.get_claim(claim_ref) body: Dict[str, Any] = {"claim_id": claim.id, "reason": reason} if note: body["note"] = note _, affected = self._emit("claim.refute", body) return claim, affected def correct_claim( self, claim_ref: str, statement: str, value: Any = None, reason: Optional[str] = None, ) -> Tuple[Claim, Claim, List[str]]: old = self.get_claim(claim_ref) body: Dict[str, Any] = {"claim_id": old.id, "statement": statement} if value is not None: body["value"] = value if reason: body["reason"] = reason _, (replacement, affected) = self._emit("claim.correct", body) return old, replacement, affected def grant_capability( self, grantee: str, topics: List[str], expires_at: Optional[str] = None, note: Optional[str] = None, ) -> Capability: body: Dict[str, Any] = {"grantee": grantee, "topics": list(topics)} if expires_at: body["expires_at"] = expires_at if note: body["note"] = note _, cap = self._emit("capability.grant", body) return cap def revoke_capability( self, cap_ref: str, reason: Optional[str] = None ) -> Capability: cap = self.get_capability(cap_ref) body: Dict[str, Any] = {"capability_id": cap.id} if reason: body["reason"] = reason _, cap = self._emit("capability.revoke", body) return cap def record_inference( self, model: str, purpose: str, input_ids: List[str], output_ids: Optional[List[str]] = None, prompt_hash: Optional[str] = None, ) -> InferenceRecord: body: Dict[str, Any] = { "model": model, "purpose": purpose, "input_ids": list(input_ids), } if output_ids: body["output_ids"] = list(output_ids) if prompt_hash: body["prompt_hash"] = prompt_hash _, rec = self._emit("inference.call", body) return rec # ------------------------------------------------------------------ # identifier resolution # ------------------------------------------------------------------ def resolve(self, ref: str) -> str: """Resolve a full id or unique prefix across claims, evidence, capabilities and ops. Returns the canonical id.""" pools: Iterable[Iterable[str]] = ( self.claims, self.evidence, self.capabilities, self.ops_by_id ) for pool in pools: if ref in pool: return ref matches: Set[str] = set() for pool in pools: for key in pool: if key.startswith(ref): matches.add(key) # allow matching by hex part alone elif "_" in key and key.split("_", 1)[1].startswith(ref): matches.add(key) if not matches: raise NotFoundError(ref) if len(matches) > 1: # ids derived from the same op share a hex part; prefer the claim hexes = {m.split("_", 1)[1] for m in matches} if len(hexes) > 1: raise AmbiguousIdError(ref, matches) # deterministic preference: claim > evidence > capability > op for prefix in ("cl_", "ev_", "cap_", "op_"): for m in sorted(matches): if m.startswith(prefix): return m return sorted(matches)[0] def get_claim(self, ref: str) -> Claim: rid = self.resolve(ref) if rid.startswith("op_"): rid = derived_id("cl", rid) claim = self.claims.get(rid) if claim is None: raise NotFoundError(ref) return claim def get_evidence(self, ref: str) -> Evidence: rid = self.resolve(ref) if rid.startswith("op_"): rid = derived_id("ev", rid) ev = self.evidence.get(rid) if ev is None: raise NotFoundError(ref) return ev def get_capability(self, ref: str) -> Capability: rid = self.resolve(ref) if rid.startswith("op_"): rid = derived_id("cap", rid) cap = self.capabilities.get(rid) if cap is None: raise NotFoundError(ref) return cap def get_object(self, ref: str) -> Any: """Resolve to whichever object type the id denotes.""" rid = self.resolve(ref) if rid in self.claims: return self.claims[rid] if rid in self.evidence: return self.evidence[rid] if rid in self.capabilities: return self.capabilities[rid] return self.ops_by_id[rid] # ------------------------------------------------------------------ # queries # ------------------------------------------------------------------ def claims_by_topic( self, prefix: Optional[str] = None, status: Optional[str] = None, include_superseded: bool = True, ) -> List[Claim]: out = [] for claim in self.claims.values(): if prefix and not ( claim.topic == prefix or claim.topic.startswith(prefix + ".") ): continue if status and claim.status != status: continue if not include_superseded and claim.status == STATUS_CORRECTED: continue out.append(claim) out.sort(key=lambda c: (c.topic, c.ts, c.id)) return out def topic_tree(self) -> Dict[str, Dict[str, int]]: """topic path -> {status -> count}, for every topic and its ancestor prefixes.""" tree: Dict[str, Dict[str, int]] = {} for claim in self.claims.values(): parts = claim.topic.split(".") for depth in range(1, len(parts) + 1): prefix = ".".join(parts[:depth]) bucket = tree.setdefault(prefix, {}) bucket[claim.status] = bucket.get(claim.status, 0) + 1 return tree def provenance_tree(self, claim_ref: str) -> Dict[str, Any]: """Nested provenance: claim -> inputs -> ... -> raw evidence leaves.""" root = self.get_claim(claim_ref) def build(obj_id: str, path: Set[str]) -> Dict[str, Any]: if obj_id in self.claims: claim = self.claims[obj_id] node: Dict[str, Any] = {"kind": "claim", "object": claim, "children": []} if obj_id in path: node["cycle"] = True return node for input_id in claim.inputs: if input_id in self.claims or input_id in self.evidence: node["children"].append(build(input_id, path | {obj_id})) else: node["children"].append( {"kind": "missing", "object": input_id, "children": []} ) return node if obj_id in self.evidence: return {"kind": "evidence", "object": self.evidence[obj_id], "children": []} return {"kind": "missing", "object": obj_id, "children": []} return build(root.id, set()) def evidence_for(self, claim_ref: str) -> List[Evidence]: """All raw evidence transitively underlying a claim.""" root = self.get_claim(claim_ref) found: Dict[str, Evidence] = {} frontier = [root.id] seen: Set[str] = set() while frontier: current = frontier.pop() if current in seen: continue seen.add(current) if current in self.evidence: found[current] = self.evidence[current] elif current in self.claims: frontier.extend(self.claims[current].inputs) return sorted(found.values(), key=lambda e: (e.observed_at, e.id)) def dependents_of(self, ref: str, recursive: bool = True) -> List[Claim]: rid = self.resolve(ref) out: Dict[str, Claim] = {} frontier = [rid] seen: Set[str] = {rid} while frontier: current = frontier.pop(0) for dep_id in sorted(self.dependents.get(current, ())): if dep_id in seen: continue seen.add(dep_id) dep = self.claims.get(dep_id) if dep is not None: out[dep_id] = dep if recursive: frontier.append(dep_id) return sorted(out.values(), key=lambda c: (c.topic, c.ts, c.id)) def stats(self) -> Dict[str, Any]: status_counts: Dict[str, int] = {} for claim in self.claims.values(): status_counts[claim.status] = status_counts.get(claim.status, 0) + 1 return { "label": self.label, "author": self.identity.author, "ops": len(self.ops), "evidence": len(self.evidence), "claims": len(self.claims), "claims_by_status": status_counts, "capabilities": len(self.capabilities), "inference_calls": len(self.inferences), "topics": len({c.topic for c in self.claims.values()}), "load_warnings": list(self.load_warnings), } # ------------------------------------------------------------------ # export # ------------------------------------------------------------------ def export_ops(self) -> List[Dict[str, Any]]: """The full operation log — *the* portable wire-format export.""" return list(self.ops) def export_graph(self) -> Dict[str, Any]: """A convenience snapshot of materialized state. Derivable from the op export by any conforming implementation; included because it is what humans and downstream tools usually want to read.""" return { "format": GRAPH_EXPORT_FORMAT, "exported_at": now_ts(), "node_label": self.label, "author": self.identity.author, "evidence": [e.to_dict() for e in sorted( self.evidence.values(), key=lambda e: (e.ts, e.id))], "claims": [c.to_dict() for c in sorted( self.claims.values(), key=lambda c: (c.ts, c.id))], "capabilities": [c.to_dict() for c in sorted( self.capabilities.values(), key=lambda c: (c.ts, c.id))], "inference_calls": [r.to_dict() for r in self.inferences], }