"""Provider-agnostic match data ingestion primitives. The MVP seed data is static, but a live World Cup product needs a safe path for official or commercial match data providers. This module defines the stable boundary: providers emit normalized match snapshots, repository adapters persist them, and the ingestion engine handles validation, idempotency, fingerprinting, and reportable errors. The code deliberately avoids binding to a specific API vendor or SQLAlchemy model. A production adapter can implement ``MatchSnapshotRepository`` using the existing persistence layer while tests and local scripts can use the in-memory repository. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum import hashlib import json from pathlib import Path from typing import Any, Iterable, Mapping, Protocol class MatchIngestionError(ValueError): """Raised when provider snapshots cannot be normalized or ingested.""" class MatchStatus(str, Enum): SCHEDULED = "scheduled" LIVE = "live" HALF_TIME = "half_time" FULL_TIME = "full_time" EXTRA_TIME = "extra_time" PENALTIES = "penalties" POSTPONED = "postponed" CANCELLED = "cancelled" ABANDONED = "abandoned" _STATUS_ALIASES = { "scheduled": MatchStatus.SCHEDULED, "not_started": MatchStatus.SCHEDULED, "ns": MatchStatus.SCHEDULED, "fixture": MatchStatus.SCHEDULED, "live": MatchStatus.LIVE, "in_play": MatchStatus.LIVE, "1h": MatchStatus.LIVE, "2h": MatchStatus.LIVE, "half_time": MatchStatus.HALF_TIME, "ht": MatchStatus.HALF_TIME, "full_time": MatchStatus.FULL_TIME, "ft": MatchStatus.FULL_TIME, "finished": MatchStatus.FULL_TIME, "aet": MatchStatus.EXTRA_TIME, "extra_time": MatchStatus.EXTRA_TIME, "penalties": MatchStatus.PENALTIES, "pens": MatchStatus.PENALTIES, "postponed": MatchStatus.POSTPONED, "cancelled": MatchStatus.CANCELLED, "canceled": MatchStatus.CANCELLED, "abandoned": MatchStatus.ABANDONED, } @dataclass(frozen=True) class ExternalTeamRef: provider: str external_id: str name: str fifa_code: str | None = None country_code: str | None = None def __post_init__(self) -> None: if not self.provider.strip(): raise MatchIngestionError("team provider must be non-empty") if not self.external_id.strip(): raise MatchIngestionError("team external_id must be non-empty") if not self.name.strip(): raise MatchIngestionError("team name must be non-empty") def to_dict(self) -> dict[str, str | None]: return { "provider": self.provider, "external_id": self.external_id, "name": self.name, "fifa_code": self.fifa_code, "country_code": self.country_code, } @classmethod def from_mapping(cls, value: Mapping[str, Any], *, provider: str) -> "ExternalTeamRef": return cls( provider=str(value.get("provider", provider)), external_id=str(value.get("external_id", value.get("id", ""))), name=str(value.get("name", "")), fifa_code=_optional_str(value.get("fifa_code")), country_code=_optional_str(value.get("country_code")), ) @dataclass(frozen=True) class ExternalStadiumRef: provider: str external_id: str name: str city: str | None = None country: str | None = None def __post_init__(self) -> None: if not self.provider.strip(): raise MatchIngestionError("stadium provider must be non-empty") if not self.external_id.strip(): raise MatchIngestionError("stadium external_id must be non-empty") if not self.name.strip(): raise MatchIngestionError("stadium name must be non-empty") def to_dict(self) -> dict[str, str | None]: return { "provider": self.provider, "external_id": self.external_id, "name": self.name, "city": self.city, "country": self.country, } @classmethod def from_mapping(cls, value: Mapping[str, Any], *, provider: str) -> "ExternalStadiumRef": return cls( provider=str(value.get("provider", provider)), external_id=str(value.get("external_id", value.get("id", ""))), name=str(value.get("name", "")), city=_optional_str(value.get("city")), country=_optional_str(value.get("country")), ) @dataclass(frozen=True) class ExternalMatchSnapshot: provider: str provider_match_id: str competition_slug: str season_label: str kickoff_at: datetime home_team: ExternalTeamRef away_team: ExternalTeamRef status: MatchStatus stadium: ExternalStadiumRef | None = None home_score: int | None = None away_score: int | None = None group_name: str | None = None round_name: str | None = None updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) confidence: float = 1.0 raw_payload: Mapping[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: if not self.provider.strip(): raise MatchIngestionError("provider must be non-empty") if not self.provider_match_id.strip(): raise MatchIngestionError("provider_match_id must be non-empty") if not self.competition_slug.strip(): raise MatchIngestionError("competition_slug must be non-empty") if not self.season_label.strip(): raise MatchIngestionError("season_label must be non-empty") object.__setattr__(self, "status", coerce_match_status(self.status)) if self.kickoff_at.tzinfo is None: raise MatchIngestionError("kickoff_at must be timezone-aware") if self.updated_at.tzinfo is None: raise MatchIngestionError("updated_at must be timezone-aware") if not 0 <= self.confidence <= 1: raise MatchIngestionError("confidence must be between 0 and 1") object.__setattr__(self, "raw_payload", dict(self.raw_payload)) @property def match_key(self) -> str: return f"{self.provider}:{self.provider_match_id}" def canonical_payload(self) -> dict[str, Any]: """Return the provider-independent fields used for idempotency.""" return { "provider": self.provider, "provider_match_id": self.provider_match_id, "competition_slug": self.competition_slug, "season_label": self.season_label, "kickoff_at": self.kickoff_at.astimezone(timezone.utc).isoformat(), "home_team": self.home_team.to_dict(), "away_team": self.away_team.to_dict(), "stadium": self.stadium.to_dict() if self.stadium else None, "status": self.status.value, "home_score": self.home_score, "away_score": self.away_score, "group_name": self.group_name, "round_name": self.round_name, "updated_at": self.updated_at.astimezone(timezone.utc).isoformat(), "confidence": self.confidence, } def fingerprint(self) -> str: payload = json.dumps(self.canonical_payload(), sort_keys=True, separators=(",", ":"), ensure_ascii=False) return hashlib.sha256(payload.encode("utf-8")).hexdigest() @classmethod def from_mapping(cls, value: Mapping[str, Any], *, provider: str | None = None) -> "ExternalMatchSnapshot": provider_value = str(provider or value.get("provider", "")).strip() if not provider_value: raise MatchIngestionError("snapshot provider is required") score = value.get("score", {}) if score is None: score = {} if not isinstance(score, Mapping): raise MatchIngestionError("score must be an object when provided") stadium_value = value.get("stadium") stadium = None if stadium_value: if not isinstance(stadium_value, Mapping): raise MatchIngestionError("stadium must be an object when provided") stadium = ExternalStadiumRef.from_mapping(stadium_value, provider=provider_value) home_team_value = value.get("home_team") away_team_value = value.get("away_team") if not isinstance(home_team_value, Mapping) or not isinstance(away_team_value, Mapping): raise MatchIngestionError("home_team and away_team objects are required") return cls( provider=provider_value, provider_match_id=str(value.get("provider_match_id", value.get("match_id", value.get("id", "")))), competition_slug=str(value.get("competition_slug", "")), season_label=str(value.get("season_label", "")), kickoff_at=parse_datetime(value.get("kickoff_at")), home_team=ExternalTeamRef.from_mapping(home_team_value, provider=provider_value), away_team=ExternalTeamRef.from_mapping(away_team_value, provider=provider_value), stadium=stadium, status=coerce_match_status(value.get("status", MatchStatus.SCHEDULED.value)), home_score=_optional_int(value.get("home_score", score.get("home"))), away_score=_optional_int(value.get("away_score", score.get("away"))), group_name=_optional_str(value.get("group_name")), round_name=_optional_str(value.get("round_name")), updated_at=parse_datetime(value.get("updated_at", datetime.now(timezone.utc).isoformat())), confidence=float(value.get("confidence", 1.0)), raw_payload=dict(value.get("raw_payload", value)), ) @dataclass(frozen=True) class SnapshotValidation: valid: bool errors: tuple[str, ...] = () warnings: tuple[str, ...] = () @dataclass(frozen=True) class MatchChange: action: str match_key: str fingerprint: str snapshot: ExternalMatchSnapshot @dataclass(frozen=True) class RejectedSnapshot: match_key: str errors: tuple[str, ...] snapshot: ExternalMatchSnapshot | None = None @dataclass(frozen=True) class IngestionReport: provider: str total: int = 0 created: int = 0 updated: int = 0 unchanged: int = 0 rejected: int = 0 warnings: tuple[str, ...] = () rejected_snapshots: tuple[RejectedSnapshot, ...] = () changes: tuple[MatchChange, ...] = () @property def accepted(self) -> int: return self.created + self.updated + self.unchanged @property def successful(self) -> bool: return self.rejected == 0 def to_dict(self) -> dict[str, Any]: return { "provider": self.provider, "total": self.total, "created": self.created, "updated": self.updated, "unchanged": self.unchanged, "rejected": self.rejected, "accepted": self.accepted, "successful": self.successful, "warnings": list(self.warnings), "rejected_snapshots": [ { "match_key": item.match_key, "errors": list(item.errors), } for item in self.rejected_snapshots ], "changes": [ { "action": change.action, "match_key": change.match_key, "fingerprint": change.fingerprint, } for change in self.changes ], } class MatchDataProvider(Protocol): """Provider boundary for official/commercial match feeds.""" @property def provider_name(self) -> str: ... def fetch_snapshots(self, *, since: datetime | None = None) -> Iterable[ExternalMatchSnapshot]: ... class MatchSnapshotRepository(Protocol): """Persistence boundary used by the ingestion engine.""" def get_fingerprint(self, provider: str, provider_match_id: str) -> str | None: ... def upsert_snapshot(self, snapshot: ExternalMatchSnapshot, fingerprint: str) -> str: """Persist a snapshot and return created, updated, or unchanged.""" class InMemoryMatchSnapshotRepository: """Repository implementation for tests, demos, and local dry-runs.""" def __init__(self) -> None: self._records: dict[tuple[str, str], dict[str, Any]] = {} def get_fingerprint(self, provider: str, provider_match_id: str) -> str | None: record = self._records.get((provider, provider_match_id)) return None if record is None else str(record["fingerprint"]) def upsert_snapshot(self, snapshot: ExternalMatchSnapshot, fingerprint: str) -> str: key = (snapshot.provider, snapshot.provider_match_id) existing = self._records.get(key) if existing is None: self._records[key] = {"snapshot": snapshot, "fingerprint": fingerprint, "version": 1} return "created" if existing["fingerprint"] == fingerprint: return "unchanged" self._records[key] = { "snapshot": snapshot, "fingerprint": fingerprint, "version": int(existing["version"]) + 1, } return "updated" def get_snapshot(self, provider: str, provider_match_id: str) -> ExternalMatchSnapshot | None: record = self._records.get((provider, provider_match_id)) return None if record is None else record["snapshot"] def version(self, provider: str, provider_match_id: str) -> int | None: record = self._records.get((provider, provider_match_id)) return None if record is None else int(record["version"]) class JSONFileMatchProvider: """Load normalized provider snapshots from a JSON file. The file may be either a list of snapshots or an object with a ``matches`` list. It is useful for backfills, provider contract tests, and operations dry-runs before wiring a live feed. """ def __init__(self, path: str | Path, *, provider_name: str) -> None: self.path = Path(path) self._provider_name = provider_name @property def provider_name(self) -> str: return self._provider_name def fetch_snapshots(self, *, since: datetime | None = None) -> Iterable[ExternalMatchSnapshot]: with self.path.open("r", encoding="utf-8") as handle: payload = json.load(handle) records = payload.get("matches") if isinstance(payload, Mapping) else payload if not isinstance(records, list): raise MatchIngestionError("match JSON must be a list or an object with a 'matches' list") snapshots = [ExternalMatchSnapshot.from_mapping(item, provider=self.provider_name) for item in records] if since is None: return tuple(snapshots) if since.tzinfo is None: raise MatchIngestionError("since must be timezone-aware") return tuple(snapshot for snapshot in snapshots if snapshot.updated_at > since) class MatchIngestionEngine: """Validate and persist provider snapshots idempotently.""" def __init__(self, repository: MatchSnapshotRepository) -> None: self.repository = repository def ingest( self, provider: MatchDataProvider, *, since: datetime | None = None, strict: bool = False, ) -> IngestionReport: if since is not None and since.tzinfo is None: raise MatchIngestionError("since must be timezone-aware") snapshots = tuple(provider.fetch_snapshots(since=since)) created = updated = unchanged = rejected = 0 warnings: list[str] = [] rejected_snapshots: list[RejectedSnapshot] = [] changes: list[MatchChange] = [] for snapshot in snapshots: validation = validate_snapshot(snapshot) warnings.extend(f"{snapshot.match_key}: {warning}" for warning in validation.warnings) if not validation.valid: rejected += 1 rejected_item = RejectedSnapshot( match_key=snapshot.match_key, errors=validation.errors, snapshot=snapshot, ) rejected_snapshots.append(rejected_item) if strict: raise MatchIngestionError( f"invalid snapshot {snapshot.match_key}: {'; '.join(validation.errors)}" ) continue fingerprint = snapshot.fingerprint() action = self.repository.upsert_snapshot(snapshot, fingerprint) if action == "created": created += 1 elif action == "updated": updated += 1 elif action == "unchanged": unchanged += 1 else: raise MatchIngestionError( "repository upsert_snapshot must return 'created', 'updated', or 'unchanged'" ) changes.append( MatchChange( action=action, match_key=snapshot.match_key, fingerprint=fingerprint, snapshot=snapshot, ) ) return IngestionReport( provider=provider.provider_name, total=len(snapshots), created=created, updated=updated, unchanged=unchanged, rejected=rejected, warnings=tuple(warnings), rejected_snapshots=tuple(rejected_snapshots), changes=tuple(changes), ) def validate_snapshot(snapshot: ExternalMatchSnapshot) -> SnapshotValidation: errors: list[str] = [] warnings: list[str] = [] if snapshot.home_team.external_id == snapshot.away_team.external_id: errors.append("home_team and away_team must be different") if snapshot.home_score is not None and snapshot.home_score < 0: errors.append("home_score must not be negative") if snapshot.away_score is not None and snapshot.away_score < 0: errors.append("away_score must not be negative") has_complete_score = snapshot.home_score is not None and snapshot.away_score is not None has_partial_score = (snapshot.home_score is None) != (snapshot.away_score is None) if has_partial_score: errors.append("score must include both home_score and away_score or neither") if ( snapshot.status in { MatchStatus.LIVE, MatchStatus.HALF_TIME, MatchStatus.FULL_TIME, MatchStatus.EXTRA_TIME, MatchStatus.PENALTIES, } and not has_complete_score and not has_partial_score ): errors.append(f"{snapshot.status.value} matches must include a complete score") if snapshot.status == MatchStatus.SCHEDULED and has_complete_score: warnings.append("scheduled match includes a score; provider may have stale status") if snapshot.status in {MatchStatus.CANCELLED, MatchStatus.POSTPONED, MatchStatus.ABANDONED} and has_complete_score: warnings.append(f"{snapshot.status.value} match includes a score that may be ignored by scoring rules") if snapshot.updated_at < snapshot.kickoff_at and snapshot.status not in {MatchStatus.SCHEDULED, MatchStatus.POSTPONED}: warnings.append("updated_at is before kickoff_at for a non-scheduled match") if snapshot.confidence < 0.75: warnings.append("provider confidence is below recommended automation threshold") return SnapshotValidation(valid=not errors, errors=tuple(errors), warnings=tuple(warnings)) def coerce_match_status(value: MatchStatus | str) -> MatchStatus: if isinstance(value, MatchStatus): return value normalized = str(value).strip().lower().replace("-", "_").replace(" ", "_") try: return _STATUS_ALIASES[normalized] except KeyError as exc: allowed = ", ".join(status.value for status in MatchStatus) raise MatchIngestionError(f"unknown match status {value!r}; allowed values: {allowed}") from exc def parse_datetime(value: Any) -> datetime: if isinstance(value, datetime): parsed = value elif isinstance(value, str): raw = value.strip() if raw.endswith("Z"): raw = f"{raw[:-1]}+00:00" try: parsed = datetime.fromisoformat(raw) except ValueError as exc: raise MatchIngestionError(f"invalid datetime {value!r}; expected ISO 8601") from exc else: raise MatchIngestionError(f"invalid datetime {value!r}; expected ISO 8601 string") if parsed.tzinfo is None: raise MatchIngestionError(f"datetime {value!r} must include timezone information") return parsed.astimezone(timezone.utc) def _optional_str(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text or None def _optional_int(value: Any) -> int | None: if value is None or value == "": return None try: return int(value) except (TypeError, ValueError) as exc: raise MatchIngestionError(f"expected integer score, got {value!r}") from exc __all__ = [ "ExternalMatchSnapshot", "ExternalStadiumRef", "ExternalTeamRef", "InMemoryMatchSnapshotRepository", "IngestionReport", "JSONFileMatchProvider", "MatchChange", "MatchDataProvider", "MatchIngestionEngine", "MatchIngestionError", "MatchSnapshotRepository", "MatchStatus", "RejectedSnapshot", "SnapshotValidation", "coerce_match_status", "parse_datetime", "validate_snapshot", ]