"""Derivation engine orchestrator. The engine is the only component that: * runs registered derivers to a **fixpoint** over the evidence/claim graph (claims can feed further derivation, so multiple passes run until nothing changes); * decides whether a candidate becomes a new claim, **supersedes** an existing claim under the same identity, or is **suppressed** because the user refuted or corrected that identity; * performs **mechanical cascade invalidation** when a claim is refuted, corrected, or its underlying evidence is retracted — purely from the provenance edges, never relying on deriver cooperation; * emits every state change as an operation through an :class:`~mnema.derive.sink.OpSink`, so the signed append-only log remains the single source of truth; * can rebuild its entire state by **replaying** that log. Identities ---------- A *claim identity* is a stable key for "the same belief over time", e.g. ``routines / user.routine / slot=tue-07:00``. When new evidence shifts a belief, the engine supersedes the previous claim with a new one under the same identity. When the user refutes or corrects a claim, its identity is suppressed: re-derivation will never resurrect it, and a correction installs a user-asserted claim that machine derivation cannot displace. """ from __future__ import annotations import hashlib from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Set from mnema.core.canonical import canonical_json from mnema.derive.derivers.base import ClaimCandidate, DerivationContext, Deriver from mnema.derive.graph import DerivationGraph from mnema.derive.model import Claim, ClaimStatus, Evidence, Provenance from mnema.derive.sink import NullSink, OpSink USER_DERIVER = "user.correction" _CONF_EPSILON = 1e-6 def _utc_now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _digest(obj: Any) -> str: return hashlib.sha256(canonical_json(obj)).hexdigest() @dataclass class RunReport: """Outcome of one :meth:`DerivationEngine.run` (fixpoint) call.""" passes: int = 0 asserted: List[str] = field(default_factory=list) superseded: List[str] = field(default_factory=list) suppressed: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class CascadeReport: """Outcome of a refute / correct / retract entry point, including everything that was mechanically invalidated or re-derived.""" action: str target_id: str reason: str identity: Optional[str] = None replacement_id: Optional[str] = None invalidated: List[str] = field(default_factory=list) superseded: List[str] = field(default_factory=list) rederived: List[str] = field(default_factory=list) suppressed: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return asdict(self) class DerivationEngine: """Coordinates derivers, the derivation graph, and the op sink.""" def __init__( self, derivers: Sequence[Deriver] = (), sink: Optional[OpSink] = None, graph: Optional[DerivationGraph] = None, clock: Optional[Callable[[], str]] = None, max_passes: int = 8, ) -> None: self.graph = graph if graph is not None else DerivationGraph() self._derivers: List[Deriver] = list(derivers) self._sink: OpSink = sink if sink is not None else NullSink() self._clock = clock if clock is not None else _utc_now_iso self._max_passes = max_passes # identity bookkeeping self._identity_to_claim: Dict[str, str] = {} self._claim_identity: Dict[str, str] = {} self._refuted_identities: Set[str] = set() self._corrected_identities: Dict[str, str] = {} # identity -> user claim id self._retracted_evidence: Set[str] = set() self._seq = 0 # ------------------------------------------------------------------ # registration / accessors # ------------------------------------------------------------------ def register(self, deriver: Deriver) -> None: self._derivers.append(deriver) @property def derivers(self) -> List[Deriver]: return list(self._derivers) def active_evidence(self) -> List[Evidence]: return [ ev for ev in self.graph.evidence() if ev.evidence_id not in self._retracted_evidence ] def active_claims(self) -> List[Claim]: return [c for c in self.graph.claims() if c.status == ClaimStatus.ACTIVE] def claims(self, include_inactive: bool = False) -> List[Claim]: if include_inactive: return list(self.graph.claims()) return self.active_claims() def get_claim(self, claim_id: str) -> Optional[Claim]: return self.graph.get_claim(claim_id) def identity_of(self, claim_id: str) -> Optional[str]: return self._claim_identity.get(claim_id) def is_retracted(self, evidence_id: str) -> bool: return evidence_id in self._retracted_evidence # ------------------------------------------------------------------ # ingestion # ------------------------------------------------------------------ def ingest(self, evidence: Evidence, emit: bool = True) -> bool: """Add one piece of evidence; returns False if already known.""" if self.graph.get_evidence(evidence.evidence_id) is not None: return False self.graph.add_evidence(evidence) if emit: self._sink.emit("evidence.observe", self._evidence_to_payload(evidence)) return True def ingest_all(self, evidence: Iterable[Evidence], emit: bool = True) -> int: return sum(1 for ev in evidence if self.ingest(ev, emit=emit)) # ------------------------------------------------------------------ # fixpoint derivation # ------------------------------------------------------------------ def run(self) -> RunReport: """Run all derivers repeatedly until no claim changes (or the pass limit is hit). Claims asserted in pass *n* become visible to derivers in pass *n+1*, which is what lets claim-over-claim derivation converge deterministically.""" report = RunReport() for _ in range(self._max_passes): changed = False ctx = DerivationContext( graph=self.graph, evidence=self.active_evidence(), claims=self.active_claims(), ) for deriver in self._derivers: for candidate in deriver.derive(ctx): if self._consider(candidate, deriver, report): changed = True report.passes += 1 if not changed: break return report def _candidate_identity(self, candidate: ClaimCandidate, deriver: Deriver) -> str: if candidate.identity: return candidate.identity return "idy_" + _digest( { "subject": candidate.subject, "predicate": candidate.predicate, "deriver": deriver.name, } )[:32] def _consider( self, candidate: ClaimCandidate, deriver: Deriver, report: RunReport ) -> bool: identity = self._candidate_identity(candidate, deriver) # Suppression: a refuted or corrected identity must never be # re-asserted by machine derivation. if identity in self._refuted_identities or identity in self._corrected_identities: if identity not in report.suppressed: report.suppressed.append(identity) return False # Input hygiene: every input must be live. Derivers see a # filtered context, but this guard makes the invariant # mechanical rather than cooperative. for input_id in candidate.inputs: if input_id in self._retracted_evidence: return False input_claim = self.graph.get_claim(input_id) if input_claim is not None and input_claim.status != ClaimStatus.ACTIVE: return False confidence = max(0.0, min(1.0, float(candidate.confidence))) existing_id = self._identity_to_claim.get(identity) existing = self.graph.get_claim(existing_id) if existing_id else None if ( existing is not None and existing.status == ClaimStatus.ACTIVE and _digest(existing.value) == _digest(candidate.value) and abs(existing.confidence - confidence) < _CONF_EPSILON ): return False # unchanged belief: no-op supersedes = ( existing.claim_id if existing is not None and existing.status == ClaimStatus.ACTIVE else None ) claim = self._make_claim(candidate, deriver, confidence, identity) self.graph.add_claim(claim) self._identity_to_claim[identity] = claim.claim_id self._claim_identity[claim.claim_id] = identity report.asserted.append(claim.claim_id) self._sink.emit( "claim.assert", { "claim": self._claim_to_payload(claim), "identity": identity, "supersedes": supersedes, }, ) self._sink.emit( "explain.record", {"claim_id": claim.claim_id, "explanation": self.explanation_for(claim.claim_id)}, ) if supersedes is not None and existing is not None: self._set_status(existing, ClaimStatus.SUPERSEDED) report.superseded.append(existing.claim_id) self._sink.emit( "claim.supersede", {"claim_id": existing.claim_id, "superseded_by": claim.claim_id}, ) return True def _make_claim( self, candidate: ClaimCandidate, deriver: Deriver, confidence: float, identity: str, ) -> Claim: self._seq += 1 claim_id = "clm_" + _digest( { "identity": identity, "subject": candidate.subject, "predicate": candidate.predicate, "value": candidate.value, "inputs": sorted(candidate.inputs), "deriver": deriver.name, "rule": candidate.rule, "seq": self._seq, } )[:32] provenance = Provenance( inputs=list(candidate.inputs), deriver=deriver.name, rule=candidate.rule, parameters=dict(candidate.parameters or {}), rationale=candidate.rationale, ) return Claim( claim_id=claim_id, subject=candidate.subject, predicate=candidate.predicate, value=candidate.value, confidence=confidence, status=ClaimStatus.ACTIVE, provenance=provenance, deriver=deriver.name, created_at=self._clock(), ) # ------------------------------------------------------------------ # dependency index and cascade invalidation # ------------------------------------------------------------------ def _direct_dependents_index(self) -> Dict[str, Set[str]]: index: Dict[str, Set[str]] = {} for claim in self.graph.claims(): for input_id in claim.provenance.inputs: index.setdefault(input_id, set()).add(claim.claim_id) return index def transitive_dependents(self, node_id: str) -> List[str]: """All claims that (transitively) cite ``node_id`` as an input, in deterministic breadth-first order.""" index = self._direct_dependents_index() seen: Set[str] = set() order: List[str] = [] frontier: List[str] = [node_id] while frontier: current = frontier.pop(0) for dependent in sorted(index.get(current, ())): if dependent not in seen: seen.add(dependent) order.append(dependent) frontier.append(dependent) return order def _invalidate_cascade( self, root_id: str, cause: Mapping[str, Any], report: CascadeReport ) -> None: for dependent_id in self.transitive_dependents(root_id): claim = self.graph.get_claim(dependent_id) if claim is None or claim.status != ClaimStatus.ACTIVE: continue self._set_status(claim, ClaimStatus.INVALIDATED) report.invalidated.append(claim.claim_id) self._sink.emit( "claim.invalidate", { "claim_id": claim.claim_id, "cascade_root": root_id, "cause": dict(cause), }, ) def _set_status(self, claim: Claim, status: ClaimStatus) -> None: claim.status = status # ------------------------------------------------------------------ # user entry points: refute / correct / retract # ------------------------------------------------------------------ def refute(self, claim_id: str, reason: str, rederive: bool = True) -> CascadeReport: """The user says: "this claim is wrong." The claim is marked REFUTED, its identity is suppressed forever, and every claim downstream of it is mechanically invalidated; remaining live inputs are then re-derived.""" claim = self.graph.get_claim(claim_id) if claim is None: raise KeyError("unknown claim: %s" % claim_id) if claim.status != ClaimStatus.ACTIVE: raise ValueError( "claim %s is %s, only ACTIVE claims can be refuted" % (claim_id, claim.status.value) ) identity = self._claim_identity.get(claim_id) if identity: self._refuted_identities.add(identity) self._set_status(claim, ClaimStatus.REFUTED) self._sink.emit( "claim.refute", {"claim_id": claim_id, "identity": identity, "reason": reason}, ) report = CascadeReport( action="refute", target_id=claim_id, reason=reason, identity=identity ) self._invalidate_cascade( claim_id, {"kind": "refutation", "node_id": claim_id, "reason": reason}, report, ) if rederive: run_report = self.run() report.rederived.extend(run_report.asserted) report.superseded.extend(run_report.superseded) report.suppressed.extend(run_report.suppressed) return report def correct( self, claim_id: str, value: Any, reason: str, confidence: float = 1.0, rederive: bool = True, ) -> CascadeReport: """The user says: "the truth is actually X." The old claim is refuted, a user-asserted claim with the corrected value is installed under the same identity, machine derivation of that identity is suppressed, and downstream claims are invalidated and re-derived (now able to consume the corrected claim).""" old = self.graph.get_claim(claim_id) if old is None: raise KeyError("unknown claim: %s" % claim_id) if old.status != ClaimStatus.ACTIVE: raise ValueError( "claim %s is %s, only ACTIVE claims can be corrected" % (claim_id, old.status.value) ) identity = self._claim_identity.get(claim_id) self._set_status(old, ClaimStatus.REFUTED) self._seq += 1 new_id = "clm_" + _digest( { "corrects": claim_id, "subject": old.subject, "predicate": old.predicate, "value": value, "seq": self._seq, } )[:32] new_claim = Claim( claim_id=new_id, subject=old.subject, predicate=old.predicate, value=value, confidence=max(0.0, min(1.0, float(confidence))), status=ClaimStatus.ACTIVE, provenance=Provenance( inputs=[], deriver=USER_DERIVER, rule="user.correction", parameters={"corrects": claim_id, "reason": reason}, rationale="User correction of %s: %s" % (claim_id, reason), ), deriver=USER_DERIVER, created_at=self._clock(), ) self.graph.add_claim(new_claim) if identity: self._corrected_identities[identity] = new_id self._identity_to_claim[identity] = new_id self._claim_identity[new_id] = identity self._sink.emit( "claim.correct", { "claim_id": claim_id, "identity": identity, "reason": reason, "new_claim": self._claim_to_payload(new_claim), }, ) self._sink.emit( "explain.record", {"claim_id": new_id, "explanation": self.explanation_for(new_id)}, ) report = CascadeReport( action="correct", target_id=claim_id, reason=reason, identity=identity, replacement_id=new_id, ) self._invalidate_cascade( claim_id, {"kind": "correction", "node_id": claim_id, "reason": reason}, report, ) if rederive: run_report = self.run() report.rederived.extend(run_report.asserted) report.superseded.extend(run_report.superseded) report.suppressed.extend(run_report.suppressed) return report def retract_evidence( self, evidence_id: str, reason: str, rederive: bool = True ) -> CascadeReport: """The user withdraws a piece of raw evidence. Every claim that (transitively) cites it is invalidated; re-derivation then rebuilds whatever the *remaining* evidence still supports — typically the same beliefs at lower confidence, or nothing.""" if self.graph.get_evidence(evidence_id) is None: raise KeyError("unknown evidence: %s" % evidence_id) if evidence_id in self._retracted_evidence: raise ValueError("evidence %s is already retracted" % evidence_id) self._retracted_evidence.add(evidence_id) self._sink.emit( "evidence.retract", {"evidence_id": evidence_id, "reason": reason} ) report = CascadeReport( action="retract", target_id=evidence_id, reason=reason ) self._invalidate_cascade( evidence_id, {"kind": "evidence_retraction", "node_id": evidence_id, "reason": reason}, report, ) if rederive: run_report = self.run() report.rederived.extend(run_report.asserted) report.superseded.extend(run_report.superseded) report.suppressed.extend(run_report.suppressed) return report # ------------------------------------------------------------------ # explanations # ------------------------------------------------------------------ def explanation_for(self, claim_id: str) -> Dict[str, Any]: """Structured answer to "why does the system believe this?": the rule, its parameters, the human-readable rationale, and a resolved summary of every input node.""" claim = self.graph.get_claim(claim_id) if claim is None: raise KeyError("unknown claim: %s" % claim_id) provenance = claim.provenance inputs: List[Dict[str, Any]] = [] for input_id in provenance.inputs: evidence = self.graph.get_evidence(input_id) if evidence is not None: inputs.append( { "id": input_id, "type": "evidence", "kind": evidence.kind, "source": evidence.source, "observed_at": evidence.observed_at, "retracted": input_id in self._retracted_evidence, "summary": _summarize_attributes(evidence.attributes), } ) continue input_claim = self.graph.get_claim(input_id) if input_claim is not None: inputs.append( { "id": input_id, "type": "claim", "predicate": input_claim.predicate, "value": input_claim.value, "status": input_claim.status.value, "confidence": input_claim.confidence, } ) continue inputs.append({"id": input_id, "type": "unknown"}) return { "claim_id": claim.claim_id, "subject": claim.subject, "predicate": claim.predicate, "value": claim.value, "confidence": claim.confidence, "status": claim.status.value, "deriver": claim.deriver, "rule": provenance.rule, "parameters": dict(provenance.parameters or {}), "rationale": provenance.rationale, "inputs": inputs, "created_at": claim.created_at, } # ------------------------------------------------------------------ # serialization and replay # ------------------------------------------------------------------ @staticmethod def _evidence_to_payload(evidence: Evidence) -> Dict[str, Any]: return { "evidence_id": evidence.evidence_id, "kind": evidence.kind, "source": evidence.source, "observed_at": evidence.observed_at, "attributes": dict(evidence.attributes), } @staticmethod def _evidence_from_payload(payload: Mapping[str, Any]) -> Evidence: return Evidence( evidence_id=payload["evidence_id"], kind=payload["kind"], source=payload["source"], observed_at=payload["observed_at"], attributes=dict(payload.get("attributes", {})), ) @staticmethod def _claim_to_payload(claim: Claim) -> Dict[str, Any]: provenance = claim.provenance return { "claim_id": claim.claim_id, "subject": claim.subject, "predicate": claim.predicate, "value": claim.value, "confidence": claim.confidence, "status": claim.status.value, "deriver": claim.deriver, "created_at": claim.created_at, "provenance": { "inputs": list(provenance.inputs), "deriver": provenance.deriver, "rule": provenance.rule, "parameters": dict(provenance.parameters or {}), "rationale": provenance.rationale, }, } @staticmethod def _claim_from_payload(payload: Mapping[str, Any]) -> Claim: prov = payload["provenance"] return Claim( claim_id=payload["claim_id"], subject=payload["subject"], predicate=payload["predicate"], value=payload["value"], confidence=float(payload["confidence"]), status=ClaimStatus(payload["status"]), provenance=Provenance( inputs=list(prov.get("inputs", [])), deriver=prov.get("deriver", ""), rule=prov.get("rule", ""), parameters=dict(prov.get("parameters", {})), rationale=prov.get("rationale", ""), ), deriver=payload["deriver"], created_at=payload["created_at"], ) def replay(self, ops: Iterable[Any]) -> int: """Rebuild engine state from operations. Replay never emits: the historical ops *are* the record. Returns the number of operations applied (recognized kinds only).""" saved_sink = self._sink self._sink = NullSink() applied = 0 try: for op in ops: if self._apply(op.kind, op.payload): applied += 1 finally: self._sink = saved_sink return applied def _apply(self, kind: str, payload: Mapping[str, Any]) -> bool: if kind in ("evidence.observe", "evidence.add"): evidence = self._evidence_from_payload(payload) if self.graph.get_evidence(evidence.evidence_id) is None: self.graph.add_evidence(evidence) return True if kind == "claim.assert": claim = self._claim_from_payload(payload["claim"]) self.graph.add_claim(claim) identity = payload.get("identity") if identity: self._identity_to_claim[identity] = claim.claim_id self._claim_identity[claim.claim_id] = identity self._seq += 1 return True if kind == "claim.supersede": claim = self.graph.get_claim(payload["claim_id"]) if claim is not None: self._set_status(claim, ClaimStatus.SUPERSEDED) return True if kind == "claim.invalidate": claim = self.graph.get_claim(payload["claim_id"]) if claim is not None: self._set_status(claim, ClaimStatus.INVALIDATED) return True if kind == "claim.refute": claim = self.graph.get_claim(payload["claim_id"]) if claim is not None: self._set_status(claim, ClaimStatus.REFUTED) identity = payload.get("identity") if identity: self._refuted_identities.add(identity) return True if kind == "claim.correct": old = self.graph.get_claim(payload["claim_id"]) if old is not None: self._set_status(old, ClaimStatus.REFUTED) new_claim = self._claim_from_payload(payload["new_claim"]) self.graph.add_claim(new_claim) identity = payload.get("identity") if identity: self._corrected_identities[identity] = new_claim.claim_id self._identity_to_claim[identity] = new_claim.claim_id self._claim_identity[new_claim.claim_id] = identity self._seq += 1 return True if kind == "evidence.retract": self._retracted_evidence.add(payload["evidence_id"]) return True # Unknown / out-of-scope kinds (capabilities, sync, explain # records, ...) are intentionally ignored: the engine only # reconstructs derivation state. return False def _summarize_attributes(attributes: Mapping[str, Any]) -> str: """One human-readable line for an evidence record, for use inside explanation input listings.""" for key in ("title", "text", "place_label"): value = attributes.get(key) if isinstance(value, str) and value: return value if len(value) <= 80 else value[:77] + "..." if "lat" in attributes and "lon" in attributes: when = attributes.get("taken_at", "") return "photo at (%.4f, %.4f) %s" % ( float(attributes["lat"]), float(attributes["lon"]), when, ) keys = ", ".join(sorted(str(k) for k in attributes)) return "record with fields: %s" % keys if keys else "empty record"