"""Adapter interface: how source material becomes evidence. An adapter is a pure parser. It reads a source (a file or a directory) and yields :class:`EvidenceItem` values -- normalized content plus the identifiers needed for provenance and deduplication. Adapters do **not** touch keys, signatures, or the log; the node does that, so every adapter automatically gets correct signing, dedup, and supersession behaviour. To write a new adapter: 1. subclass :class:`Adapter`, set ``name``, ``version``, ``content_types``; 2. implement ``parse(source) -> Iterator[EvidenceItem]``; 3. decorate the class with :func:`register`; 4. make sure the module is imported from ``pmp.adapters.__init__``. Contract details that matter: * ``content`` must be canonical-JSON-safe (str/int/float/bool/None/ list/dict, string keys, finite floats) -- the node hashes it. * ``external_id`` should be the *source's* stable identifier when one exists (ICS UID, note path, photo unique id). When the same external_id reappears with different content, the node emits a new evidence op that supersedes the old one. If there is no stable id, return ``None`` and the node dedups purely by content hash. * ``observed_at`` is when the underlying fact was recorded *at the source* (event last-modified, note mtime, photo timestamp) -- distinct from ``imported_at``, which the node stamps at import time. * Bump ``version`` whenever normalized output changes shape; the version is recorded in every evidence op's provenance. """ from __future__ import annotations import abc from dataclasses import dataclass from pathlib import Path from typing import ClassVar, Iterator, Optional, Sequence from pmp.errors import AdapterError ADAPTER_REGISTRY: dict[str, type["Adapter"]] = {} def register(cls: type["Adapter"]) -> type["Adapter"]: """Class decorator: add an adapter to the global registry.""" if not getattr(cls, "name", None): raise AdapterError(f"adapter {cls.__name__} has no name") if cls.name in ADAPTER_REGISTRY: raise AdapterError(f"duplicate adapter name {cls.name!r}") ADAPTER_REGISTRY[cls.name] = cls return cls def get_adapter(name: str) -> "Adapter": """Instantiate a registered adapter by name.""" try: return ADAPTER_REGISTRY[name]() except KeyError: known = ", ".join(sorted(ADAPTER_REGISTRY)) or "(none)" raise AdapterError(f"unknown adapter {name!r}; known adapters: {known}") from None def available_adapters() -> list[dict]: """Metadata for every registered adapter.""" return [ { "name": cls.name, "version": cls.version, "content_types": list(cls.content_types), "description": (cls.__doc__ or "").strip().splitlines()[0] if cls.__doc__ else "", } for cls in sorted(ADAPTER_REGISTRY.values(), key=lambda c: c.name) ] @dataclass(frozen=True) class EvidenceItem: """One normalized record extracted from a source.""" content_type: str content: dict external_id: Optional[str] = None observed_at: Optional[str] = None origin: Optional[str] = None # overrides the node's default origin URI class Adapter(abc.ABC): """Base class for import adapters.""" name: ClassVar[str] = "" version: ClassVar[str] = "0.0.0" content_types: ClassVar[tuple[str, ...]] = () @abc.abstractmethod def parse(self, source: Path) -> Iterator[EvidenceItem]: """Yield evidence items from *source* (a file or directory). Must raise :class:`AdapterError` for unreadable or structurally invalid sources, naming the offending file. """ # -- shared helpers ------------------------------------------------------ @staticmethod def iter_files(source: Path | str, suffixes: Sequence[str]) -> list[Path]: """Resolve *source* to a sorted list of files with the given suffixes.""" source = Path(source) if source.is_file(): return [source] if source.is_dir(): wanted = {s.lower() for s in suffixes} files = [ p for p in sorted(source.rglob("*")) if p.is_file() and p.suffix.lower() in wanted ] if not files: raise AdapterError( f"no {'/'.join(sorted(wanted))} files found under {source}" ) return files raise AdapterError(f"source not found: {source}") def prune(record: dict) -> dict: """Drop ``None`` values and empty lists/dicts so evidence content stays minimal and canonical hashes don't churn on absent-vs-null differences.""" out = {} for key, value in record.items(): if value is None: continue if isinstance(value, (list, dict)) and not value: continue out[key] = value return out