"""Claim graph materialization: provenance, cascade invalidation, conflicts. Revised in milestone 6: the claim layer is where multi-node "conflicts" actually surface and must be resolved deterministically, because the log itself is append-only and conflict-free (two nodes can always merge their operation sets). After merging logs, both nodes materialize the *same* claim view by applying the same rules in the same order: 1. Claims are read in deterministic ``(lamport, op_id)`` order. 2. Refutations win unconditionally over the claims they target, and the invalidation cascades down the derivation graph (a claim derived from a refuted/invalidated node is itself invalidated). 3. Among the remaining *active* claims, two claims with the same ``(subject, predicate)`` but different objects are a conflict. The deterministic winner is the claim with the highest ``(lamport, op_id)``; losers are marked ``superseded`` (not deleted — the log is the truth, the view is a projection). Identical objects from different sources are corroboration, not conflict, and all stay active. Wire op-type strings here are normative per the milestone-2 wire format and mirror the constants in :mod:`fablepool.ops`. """ from __future__ import annotations import json from collections import defaultdict, deque from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from .ops import Operation OP_EVIDENCE = "evidence" OP_CLAIM = "claim" OP_REFUTATION = "refutation" STATUS_ACTIVE = "active" STATUS_REFUTED = "refuted" STATUS_INVALIDATED = "invalidated" STATUS_SUPERSEDED = "superseded" def _canonical_value(value: Any) -> str: """A stable string form of a claim object, used only for equality.""" return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) @dataclass class Claim: claim_id: str subject: str predicate: str object: Any confidence: float derived_from: List[str] rationale: str method: str author: str lamport: int created_at: str status: str = STATUS_ACTIVE refuted_by: Optional[str] = None invalidated_by: Optional[str] = None superseded_by: Optional[str] = None def summary(self) -> Dict[str, Any]: return { "claim_id": self.claim_id, "subject": self.subject, "predicate": self.predicate, "object": self.object, "confidence": self.confidence, "status": self.status, "method": self.method, "rationale": self.rationale, "author": self.author, "derived_from": list(self.derived_from), } @dataclass class Conflict: subject: str predicate: str claim_ids: List[str] winner: str def to_dict(self) -> Dict[str, Any]: return { "subject": self.subject, "predicate": self.predicate, "claim_ids": list(self.claim_ids), "winner": self.winner, } class ClaimGraph: """A deterministic projection of an operation set into claims.""" def __init__(self) -> None: self.claims: Dict[str, Claim] = {} self.evidence: Dict[str, Operation] = {} self.refutations: Dict[str, Operation] = {} self.children: Dict[str, List[str]] = defaultdict(list) self.conflicts: List[Conflict] = [] # ------------------------------------------------------------------ # construction # ------------------------------------------------------------------ @classmethod def from_ops(cls, ops: Iterable[Operation]) -> "ClaimGraph": graph = cls() ordered = sorted(ops, key=lambda o: (o.lamport, o.op_id)) for op in ordered: if op.op_type == OP_EVIDENCE: graph.evidence[op.op_id] = op elif op.op_type == OP_CLAIM: body = op.body claim = Claim( claim_id=op.op_id, subject=str(body.get("subject", "")), predicate=str(body.get("predicate", "")), object=body.get("object", body.get("value")), confidence=float(body.get("confidence", 0.5)), derived_from=[str(d) for d in body.get("derived_from", [])], rationale=str(body.get("rationale", "")), method=str(body.get("method", "")), author=op.author, lamport=op.lamport, created_at=op.created_at, ) graph.claims[op.op_id] = claim for dep in claim.derived_from: graph.children[dep].append(op.op_id) elif op.op_type == OP_REFUTATION: graph.refutations[op.op_id] = op for rid, rop in sorted( graph.refutations.items(), key=lambda kv: (kv[1].lamport, kv[0]) ): target = str(rop.body.get("target", "")) graph._apply_refutation(rid, target) graph._resolve_conflicts() return graph def _apply_refutation(self, refutation_id: str, target: str) -> None: """Refute ``target`` (a claim or evidence id) and cascade downward.""" if target in self.claims: claim = self.claims[target] claim.status = STATUS_REFUTED claim.refuted_by = refutation_id # Cascade: everything derived (transitively) from the target becomes # invalid, whether the target was a claim or a piece of evidence. queue: deque[str] = deque(self.children.get(target, [])) seen: Set[str] = set() while queue: cid = queue.popleft() if cid in seen: continue seen.add(cid) child = self.claims.get(cid) if child is not None and child.status == STATUS_ACTIVE: child.status = STATUS_INVALIDATED child.invalidated_by = refutation_id queue.extend(self.children.get(cid, [])) def _resolve_conflicts(self) -> None: groups: Dict[Tuple[str, str], List[Claim]] = defaultdict(list) for claim in self.claims.values(): if claim.status == STATUS_ACTIVE: groups[(claim.subject, claim.predicate)].append(claim) for (subject, predicate), group in sorted(groups.items()): distinct = {_canonical_value(c.object) for c in group} if len(distinct) <= 1: continue winner = max(group, key=lambda c: (c.lamport, c.claim_id)) winner_value = _canonical_value(winner.object) for claim in group: if ( claim.claim_id != winner.claim_id and _canonical_value(claim.object) != winner_value ): claim.status = STATUS_SUPERSEDED claim.superseded_by = winner.claim_id self.conflicts.append( Conflict( subject=subject, predicate=predicate, claim_ids=sorted(c.claim_id for c in group), winner=winner.claim_id, ) ) # ------------------------------------------------------------------ # queries # ------------------------------------------------------------------ def get(self, claim_id: str) -> Optional[Claim]: return self.claims.get(claim_id) def active_claims(self) -> List[Claim]: return sorted( (c for c in self.claims.values() if c.status == STATUS_ACTIVE), key=lambda c: (c.subject, c.predicate, c.claim_id), ) def all_claims(self) -> List[Claim]: return sorted( self.claims.values(), key=lambda c: (c.subject, c.predicate, c.claim_id) ) def claims_for(self, subject: str) -> List[Claim]: return [c for c in self.all_claims() if c.subject == subject] def descendants(self, node_id: str) -> List[str]: """Claim ids transitively derived from ``node_id`` (cascade preview).""" out: Set[str] = set() queue: deque[str] = deque(self.children.get(node_id, [])) while queue: cid = queue.popleft() if cid in out: continue out.add(cid) queue.extend(self.children.get(cid, [])) return sorted(out) def explain(self, node_id: str, _seen: Optional[Set[str]] = None) -> Dict[str, Any]: """A provenance tree answering "why do you believe this?".""" seen = _seen or set() if node_id in seen: return {"type": "cycle", "id": node_id} seen = seen | {node_id} if node_id in self.evidence: op = self.evidence[node_id] body = op.body return { "type": "evidence", "id": node_id, "source": body.get("source"), "kind": body.get("kind"), "observed_at": body.get("observed_at"), "author": op.author, } claim = self.claims.get(node_id) if claim is None: return {"type": "missing", "id": node_id} node: Dict[str, Any] = {"type": "claim", **claim.summary()} if claim.refuted_by: node["refuted_by"] = claim.refuted_by if claim.invalidated_by: node["invalidated_by"] = claim.invalidated_by if claim.superseded_by: node["superseded_by"] = claim.superseded_by node["supports"] = [self.explain(dep, seen) for dep in claim.derived_from] return node