"""Import adapters for the FablePool reference node. Each adapter turns an external data source (an ``.ics`` calendar export, a directory of Markdown/plain-text notes, a JSON photo-metadata dump) into a list of :class:`EvidenceRecord` objects. Adapters are *pure*: they read files and return records; they never touch the operation log directly. Appending evidence to a node is done by :func:`ingest`, which is the single point of coupling between adapters and the node layer. Evidence body shape (wire format v1, milestone 2):: { "type": "evidence", "source": ":", "kind": "calendar.event" | "note.document" | "photo.metadata", "external_id": "", "observed_at": "", "payload": { ...adapter-specific, JSON-safe... } } Determinism contract: for identical input files, every adapter returns the same records in the same order, with the same external ids. This is what makes re-ingestion idempotent (see :func:`ingest` deduplication) and makes sync between nodes converge on identical evidence content. """ from __future__ import annotations import hashlib import json from dataclasses import dataclass, field from typing import Any, Callable, Iterable, List, Mapping, Optional, Sequence, Tuple __all__ = [ "EvidenceRecord", "IngestResult", "ADAPTERS", "load_adapter", "ingest", "dedup_key_for_body", ] #: Names of the built-in adapters, in the order they appear in docs and CLI. ADAPTERS: Tuple[str, ...] = ("calendar", "notes", "photos") @dataclass(frozen=True) class EvidenceRecord: """One unit of raw evidence produced by an adapter, not yet logged.""" source: str kind: str external_id: str payload: Mapping[str, Any] observed_at: Optional[str] = None def body(self) -> dict: """The evidence operation body, ready to be wrapped in a signed op.""" return { "type": "evidence", "source": self.source, "kind": self.kind, "external_id": self.external_id, "observed_at": self.observed_at, "payload": dict(self.payload), } def dedup_key(self) -> str: return dedup_key_for_body(self.body()) def dedup_key_for_body(body: Mapping[str, Any]) -> str: """Content-addressed deduplication key for an evidence body. The key intentionally covers only ``(source, kind, external_id, payload)`` so that re-running an adapter over an unchanged source file produces keys that match the evidence already in the log, regardless of any envelope fields the store added when the op was first recorded. """ core = { "source": body.get("source"), "kind": body.get("kind"), "external_id": body.get("external_id"), "payload": body.get("payload"), } blob = json.dumps(core, sort_keys=True, separators=(",", ":"), ensure_ascii=False) return hashlib.sha256(blob.encode("utf-8")).hexdigest() @dataclass class IngestResult: """Outcome of one :func:`ingest` call.""" appended: List[str] = field(default_factory=list) # op ids of new evidence skipped: int = 0 # records deduplicated away def summary(self) -> str: return f"ingested {len(self.appended)} evidence op(s), skipped {self.skipped} duplicate(s)" def load_adapter(name: str) -> Callable[[str], List[EvidenceRecord]]: """Return the ``load(path)`` callable for a built-in adapter by name.""" key = name.strip().lower() if key in ("calendar", "ics", "ical"): from . import calendar as mod elif key in ("notes", "note", "markdown", "md"): from . import notes as mod elif key in ("photos", "photo", "exif"): from . import photos as mod else: raise KeyError( f"unknown adapter {name!r}; built-in adapters are: {', '.join(ADAPTERS)}" ) return mod.load def _resolve_append(node: Any) -> Callable[[dict], str]: """Find the node's evidence-append entry point. Accepts either a Node-like object exposing ``ingest_evidence(body)`` (preferred, milestone 3 API) / ``append_evidence(body)``, or a bare callable ``append(body) -> op_id`` for tests. """ for attr in ("ingest_evidence", "append_evidence", "add_evidence"): fn = getattr(node, attr, None) if callable(fn): return fn if callable(node): return node raise TypeError( "ingest() needs a node with ingest_evidence/append_evidence, " f"or a callable; got {type(node).__name__}" ) def ingest( node: Any, records: Sequence[EvidenceRecord], existing_bodies: Optional[Iterable[Mapping[str, Any]]] = None, ) -> IngestResult: """Append new evidence records to a node, skipping exact duplicates. ``existing_bodies`` should be the evidence bodies already present in the node's log (the CLI passes them in); records whose deduplication key matches an existing body — or an earlier record in the same batch — are skipped, making repeated ingestion of the same source file idempotent. """ append = _resolve_append(node) seen = {dedup_key_for_body(b) for b in (existing_bodies or ())} result = IngestResult() for rec in records: key = rec.dedup_key() if key in seen: result.skipped += 1 continue seen.add(key) op_id = append(rec.body()) result.appended.append(str(op_id)) return result