"""The FablePool reference node. A ``Node`` ties together one identity (keypair), one append-only operation log, the derived claim view, the sync server/client, and the delegation machinery. The same class plays every role in the demo: the phone-like node, the laptop-like node, and the third-party delegate — the *grants* differ, not the software. """ from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional, Union from . import capability from .capability import DelegateCache, Selector from .claims import ClaimGraph from .keys import KeyPair from .ops import Operation, make_op from .store import LogStore from .sync import SyncClient, SyncReport, SyncServer from .transport import Transport class Node: """A user-owned (or delegated) FablePool node. ``data_dir=None`` builds an ephemeral in-memory node (tests, demos); otherwise identity, log, and delegate cache persist under ``data_dir``: * ``identity.json`` — the node's signing keypair * ``log.jsonl`` — the append-only operation log * ``delegate_cache.json`` — attested claim content this node holds as a delegate (mutable on purpose; see :mod:`fablepool.capability`) """ def __init__( self, name: str, data_dir: Optional[Union[str, Path]] = None, keypair: Optional[KeyPair] = None, ) -> None: self.name = name self.data_dir: Optional[Path] = ( Path(data_dir).expanduser() if data_dir is not None else None ) store_path: Optional[Path] = None cache_path: Optional[Path] = None if self.data_dir is not None: self.data_dir.mkdir(parents=True, exist_ok=True) key_path = self.data_dir / "identity.json" if keypair is None: if key_path.exists(): keypair = KeyPair.from_dict( json.loads(key_path.read_text(encoding="utf-8")) ) else: keypair = KeyPair.generate() key_path.write_text( json.dumps(keypair.to_dict(), indent=2), encoding="utf-8" ) try: key_path.chmod(0o600) except OSError: pass # e.g. some Windows filesystems store_path = self.data_dir / "log.jsonl" cache_path = self.data_dir / "delegate_cache.json" self.keypair: KeyPair = keypair if keypair is not None else KeyPair.generate() self.store = LogStore(store_path) self.delegate_cache = DelegateCache(cache_path) self._server = SyncServer(self) # ------------------------------------------------------------------ # identity & protocol surface # ------------------------------------------------------------------ @property def key_id(self) -> str: return self.keypair.key_id def handle(self, message: dict) -> dict: """The node's protocol message handler (give this to a transport/server).""" return self._server.handle(message) # ------------------------------------------------------------------ # writing to the log # ------------------------------------------------------------------ def append_op( self, op_type: str, body: Dict[str, Any], prev: Optional[List[str]] = None ) -> Operation: if prev is None: prev = self.store.frontier() op = make_op( op_type=op_type, keypair=self.keypair, body=body, prev=sorted(prev), lamport=self.store.max_lamport() + 1, ) self.store.append(op) return op def add_evidence( self, source: str, kind: str, content: Any, observed_at: Optional[str] = None, ) -> Operation: body = { "source": source, "kind": kind, "content": content, "observed_at": observed_at, } return self.append_op("evidence", body) def add_claim( self, subject: str, predicate: str, object: Any, confidence: float = 0.8, derived_from: Optional[List[str]] = None, rationale: str = "", method: str = "manual", ) -> Operation: body = { "subject": subject, "predicate": predicate, "object": object, "confidence": confidence, "derived_from": list(derived_from or []), "rationale": rationale, "method": method, } return self.append_op("claim", body) def refute(self, target_id: str, reason: str = "") -> Operation: if not self.store.has(target_id): raise ValueError(f"cannot refute unknown operation: {target_id!r}") return self.append_op("refutation", {"target": target_id, "reason": reason}) # ------------------------------------------------------------------ # reading the claim view # ------------------------------------------------------------------ @property def graph(self) -> ClaimGraph: return ClaimGraph.from_ops(self.store.all_ops()) def explain(self, claim_id: str) -> Dict[str, Any]: return self.graph.explain(claim_id) def what_do_you_know( self, subject: Optional[str] = None, predicate: Optional[str] = None ) -> List[Dict[str, Any]]: """The "what do you know about me and why?" answer, as data.""" graph = self.graph out: List[Dict[str, Any]] = [] for claim in graph.active_claims(): if subject is not None and claim.subject != subject: continue if predicate is not None and claim.predicate != predicate: continue entry = claim.summary() entry["evidence_sources"] = self._evidence_sources(graph, claim) out.append(entry) return out @staticmethod def _evidence_sources(graph: ClaimGraph, claim) -> List[str]: seen: set = set() sources: set = set() stack: List[str] = list(claim.derived_from) while stack: dep = stack.pop() if dep in seen: continue seen.add(dep) if dep in graph.evidence: body = graph.evidence[dep].body sources.add(f"{body.get('source', '?')}/{body.get('kind', '?')}") elif dep in graph.claims: stack.extend(graph.claims[dep].derived_from) return sorted(sources) def export_log(self) -> List[Dict[str, Any]]: """The full log as wire-format envelopes (portability/export).""" return [op.to_dict() for op in self.store.all_ops()] # ------------------------------------------------------------------ # sync (peer-to-peer, full log) # ------------------------------------------------------------------ def sync_with(self, transport: Transport) -> SyncReport: return SyncClient(self, transport).sync() # ------------------------------------------------------------------ # delegation: grantor role # ------------------------------------------------------------------ def grant( self, grantee: str, predicates: Optional[List[str]] = None, subjects: Optional[List[str]] = None, min_confidence: float = 0.0, expires_at: Optional[str] = None, note: str = "", ) -> Operation: selector = Selector( predicates=predicates, subjects=subjects, min_confidence=min_confidence ) return capability.make_grant( self, grantee, selector, expires_at=expires_at, note=note ) def revoke(self, grant_id: str, reason: str = "revoked by owner") -> Operation: return capability.make_revocation(self, grant_id, reason=reason) def grants(self) -> List[Dict[str, Any]]: return capability.list_grants(self.store) def delegate_purge_verified(self, grant_id: str) -> bool: return capability.has_valid_purge_receipt(self.store, grant_id) is not None def challenge_holdings(self, transport: Transport, grant_id: str) -> Dict[str, Any]: """Ask a delegate (behind ``transport``) to prove what it still holds.""" reply = transport.request({"type": "holdings_request", "grant_id": grant_id}) if reply.get("type") != "holdings": return {"ok": False, "holdings": [], "reason": reply.get("error", "bad reply")} grant = capability.find_grant(self.store, grant_id) expected = grant.body.get("grantee") if grant is not None else None ok, holdings, reason = capability.verify_holdings_proof( reply.get("proof"), expected_author=expected ) return {"ok": ok, "holdings": holdings, "reason": reason} # ------------------------------------------------------------------ # delegation: delegate role # ------------------------------------------------------------------ def pull_delegation(self, transport: Transport, grant_id: str) -> Dict[str, Any]: return capability.delegate_pull(self, transport, grant_id) def delegated_claims(self, grant_id: Optional[str] = None) -> List[Dict[str, Any]]: return self.delegate_cache.claims_view(grant_id) # ------------------------------------------------------------------ def __repr__(self) -> str: # pragma: no cover - debugging aid return f""