"""Human explanations for claims: why does the node believe this? Every claim carries a ``method`` (the derivation rule that produced it) and ``params`` (the rule's recorded reasoning inputs). This module turns those into plain-language explanations: what the rule does, what it saw, and why the confidence is what it is. Unknown methods get an honest generic explanation rather than a fabricated one, so foreign rules degrade gracefully. """ from __future__ import annotations from typing import Any, Callable, Dict, List, NamedTuple, Optional from .model import Claim, STATUS_CORRECTED, STATUS_INVALIDATED, STATUS_REFUTED class RuleInfo(NamedTuple): title: str description: str render: Callable[[Claim], List[str]] # returns explanation lines def _p(claim: Claim, key: str, default: Any = None) -> Any: return claim.params.get(key, default) def _conf_pct(claim: Claim) -> str: return f"{claim.confidence * 100:.0f}%" # --------------------------------------------------------------------------- # Rule-specific renderers # --------------------------------------------------------------------------- def _render_recurring_event(c: Claim) -> List[str]: lines = [] title = _p(c, "event_title", "an event") day = _p(c, "weekday", "?") time = _p(c, "time", "?") obs = _p(c, "observations") window = _p(c, "window_weeks") lines.append( f"The calendar shows {title!r} recurring on {day}s at {time}." ) if obs is not None and window is not None: lines.append( f"It occurred in {obs} of the last {window} weeks; confidence " f"{_conf_pct(c)} reflects that hit rate (with a small penalty for " f"gaps)." ) else: lines.append(f"Confidence is {_conf_pct(c)} based on recurrence regularity.") return lines def _render_busy_block(c: Claim) -> List[str]: return [ f"Calendar density analysis: {_p(c, 'events_per_week', '?')} events per " f"week on average fall inside {_p(c, 'window', 'this window')}.", f"Confidence {_conf_pct(c)} reflects how consistently the window is " f"occupied across the sampled weeks.", ] def _render_keyword_preference(c: Claim) -> List[str]: kw = _p(c, "keyword", "?") mentions = _p(c, "mentions") lines = [f"Notes mention {kw!r} as a stated preference."] if mentions is not None: lines.append( f"Found {mentions} supporting mention(s); confidence {_conf_pct(c)} " f"because self-stated text is strong but can go stale." ) else: lines.append(f"Confidence is {_conf_pct(c)}.") return lines def _render_stated_fact(c: Claim) -> List[str]: return [ "The user stated this directly in a note, so it is taken nearly at " "face value.", f"Confidence {_conf_pct(c)}: direct statements rank just below explicit " f"corrections, since notes can be old or aspirational.", ] def _render_location_cluster(c: Claim) -> List[str]: place = _p(c, "place_label", "an area") photos = _p(c, "photo_count") span = _p(c, "span_days") lines = [f"Photo metadata clusters around {place}."] if photos is not None: extra = f" across {span} days" if span is not None else "" lines.append( f"{photos} geotagged photo(s){extra} fall within the cluster radius; " f"confidence {_conf_pct(c)} scales with cluster size and spread." ) else: lines.append(f"Confidence is {_conf_pct(c)}.") return lines def _render_compose(c: Claim) -> List[str]: n = len(c.inputs) rationale = _p(c, "rationale") lines = [ f"This is a composite claim derived from {n} upstream claim(s) — it has " f"no direct evidence of its own and inherits its support entirely from " f"its inputs." ] if rationale: lines.append(f"Rule rationale: {rationale}") lines.append( f"Confidence {_conf_pct(c)} is bounded by the weakest upstream input. " f"If any input is refuted or corrected, this claim is invalidated." ) return lines def _render_user_correction(c: Claim) -> List[str]: lines = [ "The user corrected a previous claim and supplied this statement " "directly. User corrections are authoritative: confidence 100%." ] reason = _p(c, "reason") if reason: lines.append(f"Stated reason: {reason}") corrects = _p(c, "corrects") or c.corrects if corrects: lines.append(f"Replaces claim {corrects}.") return lines RULES: Dict[str, RuleInfo] = { "calendar.recurring_event": RuleInfo( "Recurring calendar event", "Detects events that repeat on the same weekday and time window and " "asserts them as routines.", _render_recurring_event, ), "calendar.busy_block": RuleInfo( "Calendar density window", "Detects time windows that are consistently occupied by events.", _render_busy_block, ), "notes.keyword_preference": RuleInfo( "Stated preference in notes", "Extracts preferences the user wrote down in their own notes.", _render_keyword_preference, ), "notes.stated_fact": RuleInfo( "Directly stated fact", "Lifts a fact the user stated verbatim in a note.", _render_stated_fact, ), "photos.location_cluster": RuleInfo( "Photo location cluster", "Clusters photo geotags to infer frequently visited places.", _render_location_cluster, ), "compose.lifestyle": RuleInfo( "Lifestyle composition", "Combines routine and preference claims into a higher-level lifestyle " "claim.", _render_compose, ), "compose.schedule_window": RuleInfo( "Schedule composition", "Combines routine claims into availability/scheduling claims.", _render_compose, ), "user.correction": RuleInfo( "User correction", "A direct, authoritative correction made by the user.", _render_user_correction, ), } GENERIC_RULE = RuleInfo( "Unknown derivation rule", "This claim was produced by a rule this node does not have a renderer " "for. The provenance graph below is still complete and verifiable.", lambda c: [ f"Derived by rule {c.method!r} from {len(c.inputs)} input(s) with " f"confidence {c.confidence * 100:.0f}%.", "Inspect the recorded parameters and provenance for details.", ], ) def rule_info(method: str) -> RuleInfo: return RULES.get(method, GENERIC_RULE) def explain_claim(claim: Claim, store=None) -> Dict[str, Any]: """Build a structured explanation for a claim. Returns a dict with: ``rule_title``, ``rule_description``, ``lines`` (the human explanation), ``status_lines`` (lifecycle notes), and ``input_summary`` (counts of claims/evidence inputs, resolved against the store when given). """ info = rule_info(claim.method) lines = info.render(claim) status_lines: List[str] = [] if claim.status == STATUS_REFUTED: status_lines.append( f"⛔ REFUTED by the user" + (f": {claim.refute_reason}" if claim.refute_reason else "") + f" (op {claim.refuted_by})." ) elif claim.status == STATUS_CORRECTED: status_lines.append( f"✏ CORRECTED — superseded by claim {claim.corrected_by}." ) elif claim.status == STATUS_INVALIDATED: via = f" via upstream claim {claim.invalidated_via}" if claim.invalidated_via else "" status_lines.append( f"⚠ INVALIDATED{via} (root cause op {claim.invalidated_by}). It " f"may be re-derived once its inputs are re-established." ) n_claims = n_evidence = n_missing = 0 if store is not None: for input_id in claim.inputs: if input_id in store.claims: n_claims += 1 elif input_id in store.evidence: n_evidence += 1 else: n_missing += 1 else: for input_id in claim.inputs: if input_id.startswith("cl_"): n_claims += 1 elif input_id.startswith("ev_"): n_evidence += 1 else: n_missing += 1 return { "rule_title": info.title, "rule_description": info.description, "lines": lines, "status_lines": status_lines, "input_summary": { "claims": n_claims, "evidence": n_evidence, "missing": n_missing, }, } def confidence_band(confidence: float) -> str: """Coarse verbal band for a confidence value, used in listings.""" if confidence >= 0.95: return "certain" if confidence >= 0.8: return "high" if confidence >= 0.6: return "moderate" if confidence >= 0.4: return "weak" return "speculative"