"""Significant-place detection from photo geo-metadata and calendar locations. Two heuristics: 1. ``place.significant`` -- greedy geographic clustering of photo metadata (haversine radius :data:`CLUSTER_RADIUS_M`). A cluster is significant if it holds >= :data:`MIN_PHOTOS` photos across >= :data:`MIN_DAYS` distinct days. A coarse role is guessed from the time-of-day distribution (night-heavy => "home", workday-heavy => "work", else "frequent"). 2. ``place.frequent_venue`` -- a calendar location string that recurs in >= :data:`MIN_VENUE_EVENTS` events. Confidence parameters are pinned by the confidence regression tests. """ from __future__ import annotations import math from collections import defaultdict from typing import Dict, List, Optional from mnema.derive.confidence import clamp, discount, support_curve, temporal_decay from mnema.derive.derivers.base import ( CandidateClaim, DerivationContext, Deriver, DeriverInfo, iso, most_common, normalize_text, parse_ts, ) from mnema.derive.model import EvidenceRecord CLUSTER_RADIUS_M = 250.0 MIN_PHOTOS = 3 MIN_DAYS = 2 HOME_NIGHT_FRACTION = 0.6 WORKDAY_FRACTION = 0.6 PHOTO_HALF_LIFE_DAYS = 240.0 PHOTO_GRACE_DAYS = 30.0 UNNAMED_DISCOUNT = 0.85 MIN_VENUE_EVENTS = 3 VENUE_HALF_LIFE_DAYS = 180.0 VENUE_GRACE_DAYS = 21.0 _EARTH_RADIUS_M = 6_371_000.0 def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: p1, p2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlmb = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlmb / 2) ** 2 return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a)) def _coord(payload: dict, *keys) -> Optional[float]: for key in keys: raw = payload.get(key) if raw is not None: try: return float(raw) except (TypeError, ValueError): continue return None def _taken_at(ev: EvidenceRecord): for key in ("taken_at", "timestamp", "datetime"): raw = ev.payload.get(key) if raw: try: return parse_ts(raw) except ValueError: continue try: return parse_ts(ev.observed_at) except ValueError: return None def _place_name(payload: dict) -> str: return str( payload.get("place_name") or payload.get("place") or payload.get("location_name") or "" ).strip() class _Cluster: __slots__ = ("lat", "lon", "members") def __init__(self, lat: float, lon: float) -> None: self.lat = lat self.lon = lon self.members: List[tuple] = [] # (EvidenceRecord, datetime, lat, lon) def add(self, ev: EvidenceRecord, dt, lat: float, lon: float) -> None: self.members.append((ev, dt, lat, lon)) n = len(self.members) self.lat += (lat - self.lat) / n self.lon += (lon - self.lon) / n class PlacesDeriver(Deriver): info = DeriverInfo( deriver_id="mnema.places", version="1.0.0", consumes_evidence=("photo.meta", "calendar.event"), consumes_predicates=(), produces_predicates=("place.significant", "place.frequent_venue"), ) def derive(self, ctx: DerivationContext) -> List[CandidateClaim]: return self._photo_clusters(ctx) + self._calendar_venues(ctx) # ------------------------------------------------------------------ # def _photo_clusters(self, ctx: DerivationContext) -> List[CandidateClaim]: clusters: List[_Cluster] = [] for ev in ctx.evidence("photo.meta"): lat = _coord(ev.payload, "lat", "latitude") lon = _coord(ev.payload, "lon", "lng", "longitude") dt = _taken_at(ev) if lat is None or lon is None or dt is None: continue for cluster in clusters: if haversine_m(lat, lon, cluster.lat, cluster.lon) <= CLUSTER_RADIUS_M: cluster.add(ev, dt, lat, lon) break else: fresh = _Cluster(lat, lon) fresh.members.append((ev, dt, lat, lon)) clusters.append(fresh) out: List[CandidateClaim] = [] for cluster in clusters: n = len(cluster.members) days = {dt.date() for _, dt, _, _ in cluster.members} if n < MIN_PHOTOS or len(days) < MIN_DAYS: continue label = most_common(_place_name(ev.payload) for ev, _, _, _ in cluster.members) night = sum(1 for _, dt, _, _ in cluster.members if dt.hour >= 21 or dt.hour < 8) workday = sum( 1 for _, dt, _, _ in cluster.members if dt.weekday() < 5 and 9 <= dt.hour < 18 ) if night / n >= HOME_NIGHT_FRACTION and len(days) >= 3: role = "home" elif workday / n >= WORKDAY_FRACTION: role = "work" else: role = "frequent" base = support_curve(len(days), midpoint=2.0, ceiling=0.9) if not label: base = discount(base, UNNAMED_DISCOUNT) last_seen = max(dt for _, dt, _, _ in cluster.members) age = max(0.0, ctx.age_days(iso(last_seen)) - PHOTO_GRACE_DAYS) conf = clamp(temporal_decay(base, age, PHOTO_HALF_LIFE_DAYS)) c_lat, c_lon = round(cluster.lat, 3), round(cluster.lon, 3) display = label or f"unnamed place near {c_lat:.3f},{c_lon:.3f}" identity = {"lat": c_lat, "lon": c_lon} value = dict(identity) value.update( { "label": display, "role": role, "photo_count": n, "distinct_days": len(days), "last_seen": iso(last_seen), } ) reasoning = [ f"{n} photos cluster within {CLUSTER_RADIUS_M:.0f} m of " f"({c_lat:.3f}, {c_lon:.3f}) across {len(days)} distinct days.", f"Time-of-day mix: {night}/{n} at night, {workday}/{n} during " f"weekday work hours => role guess '{role}'.", ] if not label: reasoning.append( f"No place name in metadata; confidence discounted x{UNNAMED_DISCOUNT}." ) out.append( CandidateClaim( subject="user", predicate="place.significant", identity=identity, value=value, confidence=conf, inputs=sorted(ev.evidence_id for ev, _, _, _ in cluster.members), summary=( f"'{display}' is a significant place ({role}): " f"{n} photos over {len(days)} days." ), reasoning=reasoning, confidence_account={ "method": "support_curve(distinct_days)" + (" * unnamed_discount" if not label else "") + " * temporal_decay", "distinct_days": len(days), "support_midpoint": 2.0, "support_ceiling": 0.9, "unnamed_discount": None if label else UNNAMED_DISCOUNT, "base": round(base, 6), "age_days_past_grace": round(age, 3), "half_life_days": PHOTO_HALF_LIFE_DAYS, "result": round(conf, 6), }, ) ) out.sort(key=lambda c: (c.identity["lat"], c.identity["lon"])) return out # ------------------------------------------------------------------ # def _calendar_venues(self, ctx: DerivationContext) -> List[CandidateClaim]: venues: Dict[str, List[tuple]] = defaultdict(list) for ev in ctx.evidence("calendar.event"): loc = normalize_text(str(ev.payload.get("location") or "")) if not loc: continue for key in ("start", "start_time", "dtstart", "begin"): raw = ev.payload.get(key) if raw: try: venues[loc].append((ev, parse_ts(raw))) except ValueError: pass break out: List[CandidateClaim] = [] for loc in sorted(venues): members = venues[loc] n = len(members) if n < MIN_VENUE_EVENTS: continue last_seen = max(dt for _, dt in members) base = support_curve(n, midpoint=2.5, ceiling=0.85) age = max(0.0, ctx.age_days(iso(last_seen)) - VENUE_GRACE_DAYS) conf = clamp(temporal_decay(base, age, VENUE_HALF_LIFE_DAYS)) titles = sorted( { normalize_text( str(ev.payload.get("title") or ev.payload.get("summary") or "") ) for ev, _ in members } - {""} ) out.append( CandidateClaim( subject="user", predicate="place.frequent_venue", identity={"name": loc}, value={ "name": loc, "visit_count": n, "example_activities": titles[:5], "last_seen": iso(last_seen), }, confidence=conf, inputs=sorted(ev.evidence_id for ev, _ in members), summary=f"Calendar places the user at '{loc}' {n} times.", reasoning=[ f"{n} calendar events list location '{loc}' " f"(threshold: {MIN_VENUE_EVENTS}).", f"Most recent visit {iso(last_seen)}.", ], confidence_account={ "method": "support_curve(visit_count) * temporal_decay", "visit_count": n, "support_midpoint": 2.5, "support_ceiling": 0.85, "base": round(base, 6), "age_days_past_grace": round(age, 3), "half_life_days": VENUE_HALF_LIFE_DAYS, "result": round(conf, 6), }, ) ) return out