"""Routine detection from calendar evidence. Heuristic: a *weekly routine* is a calendar activity (normalised title) that recurs on the same weekday around the same time of day, observed in at least ``MIN_OCCURRENCES`` events across at least ``MIN_WEEKS`` distinct ISO weeks. Confidence = ``support_curve(distinct_weeks, midpoint=2.0, ceiling=0.95)`` decayed with half-life :data:`HALF_LIFE_DAYS` after a 14-day grace period since the last observed occurrence. All parameters are pinned by the confidence regression tests. """ from __future__ import annotations import statistics from collections import defaultdict from typing import Dict, List, Optional, Tuple from mnema.derive.confidence import clamp, support_curve, temporal_decay from mnema.derive.derivers.base import ( WEEKDAYS, CandidateClaim, DerivationContext, Deriver, DeriverInfo, iso, most_common, normalize_text, parse_ts, ) from mnema.derive.model import EvidenceRecord MIN_OCCURRENCES = 3 MIN_WEEKS = 3 CLUSTER_TOLERANCE_MIN = 90 # minutes-of-day tolerance within a routine GRACE_DAYS = 14.0 HALF_LIFE_DAYS = 120.0 def _event_start(payload: dict): for key in ("start", "start_time", "dtstart", "begin"): raw = payload.get(key) if raw: try: return parse_ts(raw) except ValueError: continue return None def _event_title(payload: dict) -> str: return str(payload.get("title") or payload.get("summary") or "").strip() class RoutinesDeriver(Deriver): info = DeriverInfo( deriver_id="mnema.routines", version="1.0.0", consumes_evidence=("calendar.event",), consumes_predicates=(), produces_predicates=("routine.weekly",), ) def derive(self, ctx: DerivationContext) -> List[CandidateClaim]: groups: Dict[Tuple[str, int], List[Tuple[EvidenceRecord, object]]] = defaultdict(list) for ev in ctx.evidence("calendar.event"): title = normalize_text(_event_title(ev.payload)) start = _event_start(ev.payload) if not title or start is None: continue groups[(title, start.weekday())].append((ev, start)) out: List[CandidateClaim] = [] for (title, weekday) in sorted(groups): members = sorted(groups[(title, weekday)], key=lambda m: (m[1], m[0].evidence_id)) for cluster in self._cluster_by_time_of_day(members): cand = self._candidate(ctx, title, weekday, cluster) if cand is not None: out.append(cand) return out @staticmethod def _cluster_by_time_of_day(members): """Greedy clustering on minutes-of-day with a fixed tolerance.""" clusters: List[List[Tuple[EvidenceRecord, object]]] = [] means: List[float] = [] for ev, start in sorted( members, key=lambda m: (m[1].hour * 60 + m[1].minute, m[0].evidence_id) ): minutes = start.hour * 60 + start.minute placed = False for idx, mean in enumerate(means): if abs(minutes - mean) <= CLUSTER_TOLERANCE_MIN: clusters[idx].append((ev, start)) n = len(clusters[idx]) means[idx] = mean + (minutes - mean) / n placed = True break if not placed: clusters.append([(ev, start)]) means.append(float(minutes)) return clusters def _candidate( self, ctx: DerivationContext, title: str, weekday: int, cluster ) -> Optional[CandidateClaim]: n = len(cluster) weeks = {start.isocalendar()[:2] for _, start in cluster} if n < MIN_OCCURRENCES or len(weeks) < MIN_WEEKS: return None starts = [start for _, start in cluster] minutes = sorted(s.hour * 60 + s.minute for s in starts) med = int(statistics.median(minutes)) start_local = f"{med // 60:02d}:{med % 60:02d}" first_seen, last_seen = min(starts), max(starts) location = most_common( str(ev.payload.get("location") or "").strip() for ev, _ in cluster ) base = support_curve(len(weeks), midpoint=2.0, ceiling=0.95) age = max(0.0, ctx.age_days(iso(last_seen)) - GRACE_DAYS) conf = clamp(temporal_decay(base, age, HALF_LIFE_DAYS)) weekday_name = WEEKDAYS[weekday] identity = {"activity": title, "weekday": weekday_name} value = dict(identity) value.update( { "start_time_local": start_local, "occurrences": n, "distinct_weeks": len(weeks), "location": location, "first_seen": iso(first_seen), "last_seen": iso(last_seen), } ) reasoning = [ f"Found {n} calendar events titled '{title}' on {weekday_name}s.", f"They span {len(weeks)} distinct ISO weeks " f"(threshold: {MIN_WEEKS}), median start {start_local}.", f"Most recent occurrence {iso(last_seen)}; " f"staleness beyond a {GRACE_DAYS:.0f}-day grace decays confidence " f"with a {HALF_LIFE_DAYS:.0f}-day half-life.", ] if location: reasoning.append(f"Most common location: '{location}'.") return CandidateClaim( subject="user", predicate="routine.weekly", identity=identity, value=value, confidence=conf, inputs=sorted(ev.evidence_id for ev, _ in cluster), summary=( f"'{title}' recurs on {weekday_name}s around {start_local} " f"({n} occurrences over {len(weeks)} weeks)." ), reasoning=reasoning, confidence_account={ "method": "support_curve(distinct_weeks) * temporal_decay", "distinct_weeks": len(weeks), "support_midpoint": 2.0, "support_ceiling": 0.95, "base": round(base, 6), "age_days_past_grace": round(age, 3), "half_life_days": HALF_LIFE_DAYS, "result": round(conf, 6), }, )