"""Derivation engine: evidence in, claims-with-provenance out. This module is deliberately *pure*: it never touches the node, the store, or the network. It consumes evidence records (as produced by the adapters and recorded in the op log), the keys of claims that already exist, and the keys of claims the user has refuted; it returns a :class:`DerivationReport` of proposed claims. The CLI/node layer is responsible for appending accepted proposals as signed claim ops whose ``derived_from`` provenance points at the evidence op ids carried on each proposal — milestone 4's cascade-invalidation machinery then works unchanged. Design rules: * **Deterministic.** Same evidence set ⇒ same proposals, same order, same confidences. Two nodes that have synced to the same log state derive the same claims independently. * **Stable claim identity.** A claim's identity is the hash of ``(subject, predicate, object)``; counts and other volatile detail live in the explanation and confidence, not the object, so growing evidence *strengthens* an existing claim instead of spawning near-duplicates. * **Refutations are final.** A proposal whose claim key matches a refuted claim is suppressed, never re-proposed. The user's correction outranks the rules — this is the core ownership property of the protocol. """ from __future__ import annotations import hashlib import json import math import statistics from collections import Counter, defaultdict from dataclasses import dataclass, field, replace from datetime import datetime from typing import ( Any, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, ) __all__ = [ "Evidence", "ProposedClaim", "DerivationReport", "DerivationEngine", "default_rules", "claim_key", "claim_key_for_claim", "evidence_confidence", ] WEEKDAYS = ("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday") _STOPWORDS = frozenset( """a an and are as at be but by for from has have i in is it its me my of on or our so that the their then there this to was we were will with you your""".split() ) # --------------------------------------------------------------------------- # Core data types # --------------------------------------------------------------------------- @dataclass(frozen=True) class Evidence: """A read-only view over one evidence op for rule evaluation.""" op_id: str source: str kind: str external_id: str payload: Mapping[str, Any] observed_at: Optional[str] = None @classmethod def coerce(cls, obj: Any) -> "Evidence": """Accept an Evidence, an op dict (``{"op_id", "body"}``), or a flat evidence body dict carrying its own ``op_id``.""" if isinstance(obj, Evidence): return obj if not isinstance(obj, Mapping): raise TypeError(f"cannot coerce {type(obj).__name__} to Evidence") body = obj.get("body") if isinstance(obj.get("body"), Mapping) else obj op_id = str(obj.get("op_id") or obj.get("id") or body.get("op_id") or "") kind_field = body.get("type") if kind_field not in (None, "evidence"): raise ValueError(f"not an evidence body (type={kind_field!r})") return cls( op_id=op_id, source=str(body.get("source", "")), kind=str(body.get("kind", "")), external_id=str(body.get("external_id", "")), payload=body.get("payload") or {}, observed_at=body.get("observed_at"), ) @dataclass(frozen=True) class ProposedClaim: """A claim a rule wants to assert, with provenance and a human reason.""" subject: str predicate: str object: Any confidence: float rule: str evidence: Tuple[str, ...] # op ids of supporting evidence explanation: str def claim_key(self) -> str: return claim_key(self.subject, self.predicate, self.object) def body(self) -> dict: """Claim op body (wire format v1). The node layer wraps and signs it.""" return { "type": "claim", "subject": self.subject, "predicate": self.predicate, "object": self.object, "confidence": round(float(self.confidence), 3), "rule": self.rule, "derived_from": list(self.evidence), "explanation": self.explanation, } @dataclass class DerivationReport: """What one engine run wants to do, split by disposition.""" proposed: List[ProposedClaim] = field(default_factory=list) unchanged: List[ProposedClaim] = field(default_factory=list) # already asserted suppressed: List[ProposedClaim] = field(default_factory=list) # user refuted def summary(self) -> str: return ( f"{len(self.proposed)} new claim(s), " f"{len(self.unchanged)} already asserted, " f"{len(self.suppressed)} suppressed by refutation" ) # --------------------------------------------------------------------------- # Identity, confidence, and parsing helpers # --------------------------------------------------------------------------- def claim_key(subject: str, predicate: str, obj: Any) -> str: """Stable identity for a claim: hash of its semantic triple.""" blob = json.dumps( [subject, predicate, obj], sort_keys=True, separators=(",", ":"), ensure_ascii=False ) return hashlib.sha256(blob.encode("utf-8")).hexdigest() def claim_key_for_claim(claim: Mapping[str, Any]) -> Optional[str]: """Compute the claim key of an existing claim op (dict or op envelope).""" body = claim.get("body") if isinstance(claim.get("body"), Mapping) else claim if "predicate" not in body: return None return claim_key( str(body.get("subject", "user")), str(body["predicate"]), body.get("object") ) def evidence_confidence(n: int, base: float = 0.4, gain: float = 0.15, cap: float = 0.95) -> float: """Confidence monotone in evidence count, asymptotic to ``cap``. ``n=1`` yields ``base``; each additional independent observation closes a fraction of the remaining gap. Bounded away from 1.0 on purpose: derived claims are never certain. """ n = max(1, int(n)) value = cap - (cap - base) * math.exp(-gain * (n - 1)) return round(min(cap, max(0.0, value)), 3) def _parse_iso(value: Optional[str]) -> Optional[datetime]: if not value or not isinstance(value, str): return None text = value.strip().rstrip("Z") for candidate in (text, text[:19], text[:10]): try: return datetime.fromisoformat(candidate) except ValueError: continue return None def _norm_title(title: str) -> str: words = [ word for word in "".join(ch.lower() if ch.isalnum() else " " for ch in title).split() if word ] return " ".join(words) def _event_start(ev: Evidence) -> Optional[datetime]: start = ev.payload.get("start") if isinstance(start, Mapping): return _parse_iso(start.get("datetime") or start.get("date")) return _parse_iso(ev.observed_at) def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: radius = 6371000.0 phi1, phi2 = math.radians(lat1), math.radians(lat2) d_phi = phi2 - phi1 d_lambda = math.radians(lon2 - lon1) a = math.sin(d_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(d_lambda / 2) ** 2 return 2 * radius * math.asin(math.sqrt(a)) def _evidence_ids(items: Iterable[Evidence]) -> Tuple[str, ...]: return tuple(sorted({ev.op_id for ev in items if ev.op_id})) # --------------------------------------------------------------------------- # Rules — calendar # --------------------------------------------------------------------------- class RecurringActivityRule: """Repeated or RRULE'd calendar events ⇒ ``has_recurring_activity``.""" rule_id = "cal.recurring_activity.v1" MIN_REPEATS = 3 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: groups: Dict[str, List[Evidence]] = defaultdict(list) for ev in evidence: if ev.kind != "calendar.event": continue title = _norm_title(str(ev.payload.get("summary", ""))) if title: groups[title].append(ev) proposals: List[ProposedClaim] = [] for title in sorted(groups): events = groups[title] rrule = next( (e.payload.get("rrule") for e in events if isinstance(e.payload.get("rrule"), Mapping)), None, ) if rrule and rrule.get("FREQ"): cadence = str(rrule["FREQ"]).lower() confidence = 0.9 reason = f"declared RRULE FREQ={rrule['FREQ']}" elif len(events) >= self.MIN_REPEATS: cadence = "repeated" confidence = evidence_confidence(len(events), base=0.45, gain=0.2) reason = f"{len(events)} separate occurrences" else: continue day_counts: Counter = Counter() location_counts: Counter = Counter() for ev in events: start = _event_start(ev) if start: day_counts[WEEKDAYS[start.weekday()]] += 1 location = ev.payload.get("location") if location: location_counts[str(location)] += 1 obj: Dict[str, Any] = {"activity": title, "cadence": cadence} if day_counts: obj["typical_day"] = day_counts.most_common(1)[0][0] if location_counts: obj["usual_location"] = location_counts.most_common(1)[0][0] explanation = ( f"Calendar shows {len(events)} event(s) titled '{title}' ({reason})" + (f", usually on {obj['typical_day']}" if "typical_day" in obj else "") + (f" at {obj['usual_location']}" if "usual_location" in obj else "") + "." ) proposals.append( ProposedClaim( subject="user", predicate="has_recurring_activity", object=obj, confidence=confidence, rule=self.rule_id, evidence=_evidence_ids(events), explanation=explanation, ) ) return proposals class FrequentContactRule: """Recurring meeting attendees ⇒ ``frequently_meets``.""" rule_id = "cal.frequent_contact.v1" MIN_MEETINGS = 3 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: meetings: Dict[str, List[Evidence]] = defaultdict(list) names: Dict[str, Counter] = defaultdict(Counter) for ev in evidence: if ev.kind != "calendar.event": continue attendees = ev.payload.get("attendees") if not isinstance(attendees, list): continue for person in attendees: if not isinstance(person, Mapping): continue email = str(person.get("email", "")).strip().lower() if not email: continue meetings[email].append(ev) if person.get("name"): names[email][str(person["name"])] += 1 proposals: List[ProposedClaim] = [] for email in sorted(meetings): events = meetings[email] if len(events) < self.MIN_MEETINGS: continue display = names[email].most_common(1)[0][0] if names[email] else email obj = {"person": display, "email": email} proposals.append( ProposedClaim( subject="user", predicate="frequently_meets", object=obj, confidence=evidence_confidence(len(events), base=0.5, gain=0.18), rule=self.rule_id, evidence=_evidence_ids(events), explanation=( f"{display} <{email}> appears as an attendee in " f"{len(events)} calendar event(s)." ), ) ) return proposals class TypicalHoursRule: """Weekday timed events ⇒ ``typical_active_hours`` (median start/end).""" rule_id = "cal.typical_hours.v1" MIN_SAMPLES = 5 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: starts: List[int] = [] ends: List[int] = [] used: List[Evidence] = [] for ev in evidence: if ev.kind != "calendar.event": continue start_field = ev.payload.get("start") if not isinstance(start_field, Mapping) or start_field.get("all_day"): continue start = _parse_iso(start_field.get("datetime")) if start is None or start.weekday() >= 5: continue starts.append(start.hour) end_field = ev.payload.get("end") end = _parse_iso(end_field.get("datetime")) if isinstance(end_field, Mapping) else None ends.append(end.hour if end else min(23, start.hour + 1)) used.append(ev) if len(used) < self.MIN_SAMPLES: return [] start_hour = int(round(statistics.median(starts))) end_hour = int(round(statistics.median(ends))) obj = {"days": "weekdays", "start_hour": start_hour, "end_hour": end_hour} return [ ProposedClaim( subject="user", predicate="typical_active_hours", object=obj, confidence=evidence_confidence(len(used), base=0.4, gain=0.1, cap=0.85), rule=self.rule_id, evidence=_evidence_ids(used), explanation=( f"Median start/end across {len(used)} timed weekday calendar " f"event(s) is {start_hour:02d}:00–{end_hour:02d}:00." ), ) ] class CalendarPlaceRule: """Repeated calendar locations ⇒ ``frequents_place`` (named place).""" rule_id = "cal.frequent_place.v1" MIN_VISITS = 3 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: places: Dict[str, List[Evidence]] = defaultdict(list) for ev in evidence: if ev.kind != "calendar.event": continue location = str(ev.payload.get("location", "")).strip() if location: places[location].append(ev) proposals: List[ProposedClaim] = [] for place in sorted(places): events = places[place] if len(events) < self.MIN_VISITS: continue proposals.append( ProposedClaim( subject="user", predicate="frequents_place", object={"place": place, "via": "calendar"}, confidence=evidence_confidence(len(events), base=0.45, gain=0.18), rule=self.rule_id, evidence=_evidence_ids(events), explanation=( f"'{place}' is the location of {len(events)} calendar event(s)." ), ) ) return proposals # --------------------------------------------------------------------------- # Rules — notes # --------------------------------------------------------------------------- class NoteTagInterestRule: """A tag used across multiple notes ⇒ ``has_interest``.""" rule_id = "notes.tag_interest.v1" MIN_NOTES = 2 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: by_tag: Dict[str, List[Evidence]] = defaultdict(list) for ev in evidence: if ev.kind != "note.document": continue tags = ev.payload.get("tags") if not isinstance(tags, list): continue for tag in {str(t).lower() for t in tags}: if tag and tag not in _STOPWORDS: by_tag[tag].append(ev) proposals: List[ProposedClaim] = [] for tag in sorted(by_tag): notes = by_tag[tag] if len(notes) < self.MIN_NOTES: continue proposals.append( ProposedClaim( subject="user", predicate="has_interest", object={"topic": tag}, confidence=evidence_confidence(len(notes), base=0.45, gain=0.2), rule=self.rule_id, evidence=_evidence_ids(notes), explanation=f"Tag '#{tag}' appears in {len(notes)} separate note(s).", ) ) return proposals class JournalingHabitRule: """Many dated notes over a sustained span ⇒ ``has_habit`` journaling.""" rule_id = "notes.journaling_habit.v1" MIN_NOTES = 5 MIN_SPAN_DAYS = 14 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: dated: List[Tuple[datetime, Evidence]] = [] for ev in evidence: if ev.kind != "note.document": continue when = _parse_iso(ev.payload.get("created") or ev.observed_at) if when: dated.append((when, ev)) if len(dated) < self.MIN_NOTES: return [] dated.sort(key=lambda pair: pair[0]) span_days = (dated[-1][0] - dated[0][0]).days if span_days < self.MIN_SPAN_DAYS: return [] notes = [ev for _, ev in dated] return [ ProposedClaim( subject="user", predicate="has_habit", object={"habit": "journaling"}, confidence=evidence_confidence(len(notes), base=0.4, gain=0.12, cap=0.9), rule=self.rule_id, evidence=_evidence_ids(notes), explanation=( f"{len(notes)} dated note(s) span {span_days} days, " "consistent with a regular journaling habit." ), ) ] # --------------------------------------------------------------------------- # Rules — photos # --------------------------------------------------------------------------- class PhotoPlaceClusterRule: """GPS clusters in photo metadata ⇒ ``frequents_place`` (coordinates).""" rule_id = "photos.place_cluster.v1" MIN_PHOTOS = 3 RADIUS_M = 300.0 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: points: List[Tuple[float, float, Evidence]] = [] for ev in evidence: if ev.kind != "photo.metadata": continue gps = ev.payload.get("gps") if isinstance(gps, Mapping) and "lat" in gps and "lon" in gps: points.append((float(gps["lat"]), float(gps["lon"]), ev)) # Sort for determinism before greedy clustering. points.sort(key=lambda p: (p[0], p[1], p[2].op_id)) clusters: List[Dict[str, Any]] = [] for lat, lon, ev in points: placed = False for cluster in clusters: if _haversine_m(lat, lon, cluster["lat"], cluster["lon"]) <= self.RADIUS_M: members = cluster["members"] members.append(ev) n = len(members) cluster["lat"] += (lat - cluster["lat"]) / n cluster["lon"] += (lon - cluster["lon"]) / n placed = True break if not placed: clusters.append({"lat": lat, "lon": lon, "members": [ev]}) proposals: List[ProposedClaim] = [] for cluster in clusters: members = cluster["members"] if len(members) < self.MIN_PHOTOS: continue # ~110 m resolution keeps the claim object stable as photos accrue. obj = { "lat": round(cluster["lat"], 3), "lon": round(cluster["lon"], 3), "via": "photos", } proposals.append( ProposedClaim( subject="user", predicate="frequents_place", object=obj, confidence=evidence_confidence(len(members), base=0.5, gain=0.18), rule=self.rule_id, evidence=_evidence_ids(members), explanation=( f"{len(members)} photo(s) were taken within " f"{int(self.RADIUS_M)} m of ({obj['lat']}, {obj['lon']})." ), ) ) proposals.sort(key=lambda pc: (pc.object["lat"], pc.object["lon"])) return proposals class CameraDeviceRule: """Dominant camera make/model ⇒ ``uses_device``.""" rule_id = "photos.camera_device.v1" MIN_PHOTOS = 3 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: by_device: Dict[Tuple[str, str], List[Evidence]] = defaultdict(list) for ev in evidence: if ev.kind != "photo.metadata": continue camera = ev.payload.get("camera") if isinstance(camera, Mapping) and camera.get("model"): key = (str(camera.get("make", "")).strip(), str(camera["model"]).strip()) by_device[key].append(ev) proposals: List[ProposedClaim] = [] for (make, model) in sorted(by_device): photos = by_device[(make, model)] if len(photos) < self.MIN_PHOTOS: continue obj: Dict[str, Any] = {"model": model} if make: obj["make"] = make proposals.append( ProposedClaim( subject="user", predicate="uses_device", object=obj, confidence=evidence_confidence(len(photos), base=0.55, gain=0.2), rule=self.rule_id, evidence=_evidence_ids(photos), explanation=( f"{len(photos)} photo(s) carry EXIF camera " f"'{(make + ' ') if make else ''}{model}'." ), ) ) return proposals class PhotoTagInterestRule: """A tag recurring across photos ⇒ ``has_interest`` (corroborates notes).""" rule_id = "photos.tag_interest.v1" MIN_PHOTOS = 3 def derive(self, evidence: Sequence[Evidence]) -> List[ProposedClaim]: by_tag: Dict[str, List[Evidence]] = defaultdict(list) for ev in evidence: if ev.kind != "photo.metadata": continue tags = ev.payload.get("tags") if not isinstance(tags, list): continue for tag in {str(t).lower() for t in tags}: if tag and tag not in _STOPWORDS: by_tag[tag].append(ev) proposals: List[ProposedClaim] = [] for tag in sorted(by_tag): photos = by_tag[tag] if len(photos) < self.MIN_PHOTOS: continue # Same object shape as NoteTagInterestRule on purpose: identical # (subject, predicate, object) triples merge into one claim whose # evidence then spans both sources. proposals.append( ProposedClaim( subject="user", predicate="has_interest", object={"topic": tag}, confidence=evidence_confidence(len(photos), base=0.4, gain=0.15), rule=self.rule_id, evidence=_evidence_ids(photos), explanation=f"Tag '{tag}' appears on {len(photos)} photo(s).", ) ) return proposals # --------------------------------------------------------------------------- # Engine # --------------------------------------------------------------------------- def default_rules() -> List[Any]: """The reference rule set, in deterministic evaluation order.""" return [ RecurringActivityRule(), FrequentContactRule(), TypicalHoursRule(), CalendarPlaceRule(), NoteTagInterestRule(), JournalingHabitRule(), PhotoPlaceClusterRule(), CameraDeviceRule(), PhotoTagInterestRule(), ] class DerivationEngine: """Runs a rule set over evidence and reconciles against existing state.""" def __init__(self, rules: Optional[Sequence[Any]] = None) -> None: self.rules = list(rules) if rules is not None else default_rules() def derive( self, evidence: Iterable[Any], existing_claim_keys: Iterable[str] = (), refuted_claim_keys: Iterable[str] = (), ) -> DerivationReport: """Evaluate all rules and classify each resulting claim. ``existing_claim_keys`` / ``refuted_claim_keys`` are sets of :func:`claim_key` values for claims already asserted in the graph and claims the user has refuted, respectively (use :func:`claim_key_for_claim` to compute them from claim ops). """ records = [Evidence.coerce(item) for item in evidence] existing: Set[str] = set(existing_claim_keys) refuted: Set[str] = set(refuted_claim_keys) raw: List[ProposedClaim] = [] for rule in self.rules: raw.extend(rule.derive(records)) merged = self._merge(raw) report = DerivationReport() for proposal in merged: key = proposal.claim_key() if key in refuted: report.suppressed.append(proposal) elif key in existing: report.unchanged.append(proposal) else: report.proposed.append(proposal) return report @staticmethod def _merge(proposals: Sequence[ProposedClaim]) -> List[ProposedClaim]: """Merge proposals with identical claim keys (cross-source agreement). The higher-confidence proposal wins the rule attribution and base explanation; evidence sets are unioned and a small corroboration bonus is applied, capped at 0.97. """ merged: Dict[str, ProposedClaim] = {} order: List[str] = [] for proposal in proposals: key = proposal.claim_key() if key not in merged: merged[key] = proposal order.append(key) continue current = merged[key] winner, other = ( (current, proposal) if current.confidence >= proposal.confidence else (proposal, current) ) combined_evidence = tuple(sorted(set(winner.evidence) | set(other.evidence))) boosted = min(0.97, round(winner.confidence + 0.1 * other.confidence, 3)) merged[key] = replace( winner, evidence=combined_evidence, confidence=boosted, explanation=( f"{winner.explanation} Corroborated independently by rule " f"{other.rule}: {other.explanation}" ), ) return [merged[key] for key in order]