"""Loaders for evidence record files (JSON Lines). The sample datasets under ``data/samples/`` use one JSON object per line with the shape:: { "evidence_id": "ev-cal-001", # optional; derived if absent "kind": "calendar.event", "source": "ical:personal", "observed_at": "2024-09-02T09:45:00Z", "attributes": { ... } } This mirrors the evidence payloads the milestone-3 import adapters emit onto the operation log, so the same records can be fed either through the adapters or directly through ``mnema-derive ingest``. """ from __future__ import annotations import hashlib import json from pathlib import Path from typing import Any, Iterable, List, Mapping from mnema.core.canonical import canonical_json from mnema.derive.model import Evidence def evidence_from_record(record: Mapping[str, Any]) -> Evidence: for required in ("kind", "source", "observed_at"): if required not in record: raise ValueError("evidence record missing required field %r" % required) evidence_id = record.get("evidence_id") if not evidence_id: evidence_id = "ev_" + hashlib.sha256( canonical_json( { "kind": record["kind"], "source": record["source"], "observed_at": record["observed_at"], "attributes": record.get("attributes", {}), } ) ).hexdigest()[:24] return Evidence( evidence_id=evidence_id, kind=record["kind"], source=record["source"], observed_at=record["observed_at"], attributes=dict(record.get("attributes", {})), ) def load_evidence_jsonl(path: Path) -> List[Evidence]: records: List[Evidence] = [] with open(path, "r", encoding="utf-8") as handle: for line_number, line in enumerate(handle, start=1): line = line.strip() if not line or line.startswith("#"): continue try: record = json.loads(line) except json.JSONDecodeError as exc: raise ValueError( "%s:%d: invalid JSON: %s" % (path, line_number, exc) ) from exc records.append(evidence_from_record(record)) return records def load_evidence_files(paths: Iterable[Path]) -> List[Evidence]: """Load several files and return all evidence sorted by observation time (stable on evidence id), so ingestion order is deterministic.""" all_records: List[Evidence] = [] for path in paths: all_records.extend(load_evidence_jsonl(Path(path))) all_records.sort(key=lambda ev: (ev.observed_at, ev.evidence_id)) return all_records