"""Photos adapter: mock EXIF metadata (JSON) -> ``photo.metadata`` evidence. The reference node deliberately does not parse image binaries: the point of this adapter is to exercise the evidence pipeline with the *shape* of photo metadata (timestamps, GPS, camera) without an imaging dependency. A production adapter would extract real EXIF and emit the same content type. Accepted input: a ``.json`` file (or directory of them) containing either a single EXIF-style object, a list of them, or ``{"photos": [...]}``. Field names follow EXIF conventions (``DateTimeOriginal``, ``Make``, ``GPSLatitude``...); snake_case equivalents are also accepted. Normalization performed: * ``DateTimeOriginal`` ``YYYY:MM:DD HH:MM:SS`` -> ISO 8601, honouring ``OffsetTimeOriginal`` when present; * GPS coordinates: decimal degrees or ``[deg, min, sec]`` arrays, with ``GPSLatitudeRef``/``GPSLongitudeRef`` signs, rounded to 6 d.p.; * the full original record is preserved under ``raw_exif`` so no source information is lost in normalization. ``external_id`` is ``ImageUniqueID`` when present, else the file name. """ from __future__ import annotations import json import re from pathlib import Path from typing import Any, Iterator, Optional from pmp.adapters.base import Adapter, EvidenceItem, prune, register from pmp.errors import AdapterError _EXIF_DT_RE = re.compile( r"^(\d{4})[:\-](\d{2})[:\-](\d{2})[ T](\d{2}):(\d{2}):(\d{2})$" ) def _get(record: dict, *names: str) -> Any: for name in names: if name in record and record[name] is not None: return record[name] return None def exif_datetime_to_iso(value: Any, offset: Any = None) -> Optional[str]: """``2024:06:01 14:23:11`` (+ optional ``+02:00`` offset) -> ISO 8601.""" if not isinstance(value, str): return None m = _EXIF_DT_RE.match(value.strip()) if not m: return value.strip() or None # already ISO-ish; keep faithfully y, mo, d, h, mi, s = m.groups() iso = f"{y}-{mo}-{d}T{h}:{mi}:{s}" if isinstance(offset, str) and re.match(r"^[+\-]\d{2}:\d{2}$", offset.strip()): iso += offset.strip() return iso def gps_to_decimal(coord: Any, ref: Any = None) -> Optional[float]: """Decimal degrees from a float or a [deg, min, sec] / [deg, min] array.""" decimal: Optional[float] = None if isinstance(coord, (int, float)) and not isinstance(coord, bool): decimal = float(coord) elif ( isinstance(coord, list) and 1 <= len(coord) <= 3 and all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in coord) ): parts = list(coord) + [0.0] * (3 - len(coord)) decimal = float(parts[0]) + float(parts[1]) / 60.0 + float(parts[2]) / 3600.0 if decimal is None: return None if isinstance(ref, str) and ref.strip().upper() in ("S", "W"): decimal = -abs(decimal) return round(decimal, 6) def _records_from_document(doc: Any, path: Path) -> list[dict]: if isinstance(doc, dict) and isinstance(doc.get("photos"), list): records = doc["photos"] elif isinstance(doc, list): records = doc elif isinstance(doc, dict): records = [doc] else: raise AdapterError(f"{path}: expected an EXIF object, list, or {{'photos': []}}") for i, rec in enumerate(records): if not isinstance(rec, dict): raise AdapterError(f"{path}: photo record {i} is not an object") return records def _normalize(record: dict) -> dict: taken_at = exif_datetime_to_iso( _get(record, "DateTimeOriginal", "date_time_original", "taken_at"), _get(record, "OffsetTimeOriginal", "offset_time_original"), ) lat = gps_to_decimal( _get(record, "GPSLatitude", "gps_latitude"), _get(record, "GPSLatitudeRef", "gps_latitude_ref"), ) lon = gps_to_decimal( _get(record, "GPSLongitude", "gps_longitude"), _get(record, "GPSLongitudeRef", "gps_longitude_ref"), ) altitude = _get(record, "GPSAltitude", "gps_altitude") if isinstance(altitude, (int, float)) and not isinstance(altitude, bool): altitude = round(float(altitude), 1) else: altitude = None gps = prune({"lat": lat, "lon": lon, "altitude_m": altitude}) if lat is not None and lon is not None else None camera = prune( { "make": _get(record, "Make", "make"), "model": _get(record, "Model", "model"), "lens": _get(record, "LensModel", "lens_model", "lens"), } ) exposure = prune( { "iso": _get(record, "ISOSpeedRatings", "ISO", "iso"), "f_number": _get(record, "FNumber", "f_number"), "exposure_time": _get(record, "ExposureTime", "exposure_time"), "focal_length_mm": _get(record, "FocalLength", "focal_length_mm"), } ) dimensions = prune( { "width": _get(record, "PixelXDimension", "ImageWidth", "width"), "height": _get(record, "PixelYDimension", "ImageHeight", "height"), } ) return prune( { "file_name": _get(record, "FileName", "file_name", "filename"), "image_unique_id": _get(record, "ImageUniqueID", "image_unique_id"), "taken_at": taken_at, "gps": gps, "camera": camera or None, "exposure": exposure or None, "dimensions": dimensions or None, "orientation": _get(record, "Orientation", "orientation"), "software": _get(record, "Software", "software"), "raw_exif": record, } ) @register class PhotosAdapter(Adapter): """Imports mock photo EXIF metadata (JSON) as ``photo.metadata`` evidence.""" name = "photos" version = "1.0.0" content_types = ("photo.metadata",) def parse(self, source: Path) -> Iterator[EvidenceItem]: for path in self.iter_files(source, (".json",)): try: doc = json.loads(path.read_text(encoding="utf-8")) except (OSError, UnicodeDecodeError, json.JSONDecodeError) as exc: raise AdapterError(f"cannot parse {path}: {exc}") from exc for i, record in enumerate(_records_from_document(doc, path)): content = _normalize(record) external_id = ( content.get("image_unique_id") or content.get("file_name") or f"{path.name}#{i}" ) yield EvidenceItem( content_type="photo.metadata", content=content, external_id=str(external_id), observed_at=content.get("taken_at"), origin=path.resolve().as_uri(), )