"""Admin content change-set validation and application services. Fan Passport content will change throughout a tournament: quiz questions need daily publishing, match metadata can be corrected, badges may be tuned, and future competitions will be added after the World Cup. This module provides a safe service-layer boundary for those updates with validation, optimistic revision checks, permission enforcement, dry-runs, and audit records. The service is persistence-agnostic. Production code can implement ``ContentRepository`` with SQLAlchemy models, while local scripts and tests use ``InMemoryContentRepository``. """ from __future__ import annotations from collections import Counter from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum import hashlib import json from pathlib import Path import re from typing import Any, Iterable, Mapping, MutableMapping, Protocol from uuid import uuid4 _STABLE_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:-]{1,127}$") WRITE_PERMISSIONS = frozenset({"content:write", "content:admin"}) DELETE_PERMISSIONS = frozenset({"content:delete", "content:admin"}) class ContentValidationError(ValueError): """Raised when a content change set cannot be accepted or applied.""" class ContentEntityType(str, Enum): COMPETITION = "competition" TEAM = "team" STADIUM = "stadium" MATCH = "match" QUIZ_QUESTION = "quiz_question" CHALLENGE = "challenge" BADGE = "badge" STICKER = "sticker" TRIVIA_PACK = "trivia_pack" class ContentOperationType(str, Enum): UPSERT = "upsert" ARCHIVE = "archive" DELETE = "delete" class IssueSeverity(str, Enum): ERROR = "error" WARNING = "warning" @dataclass(frozen=True) class AdminActor: actor_id: str display_name: str role: str permissions: frozenset[str] = field(default_factory=frozenset) def __post_init__(self) -> None: if not self.actor_id.strip(): raise ContentValidationError("admin actor_id must be non-empty") if not self.display_name.strip(): raise ContentValidationError("admin display_name must be non-empty") if not self.role.strip(): raise ContentValidationError("admin role must be non-empty") object.__setattr__(self, "permissions", frozenset(str(permission) for permission in self.permissions)) def has_any_permission(self, permissions: Iterable[str]) -> bool: return bool(self.permissions.intersection(permissions)) def to_dict(self) -> dict[str, Any]: return { "actor_id": self.actor_id, "display_name": self.display_name, "role": self.role, "permissions": sorted(self.permissions), } @classmethod def from_mapping(cls, value: Mapping[str, Any]) -> "AdminActor": return cls( actor_id=str(value["actor_id"]), display_name=str(value["display_name"]), role=str(value.get("role", "admin")), permissions=frozenset(str(item) for item in value.get("permissions", ())), ) @dataclass(frozen=True) class ContentChange: entity_type: ContentEntityType | str operation: ContentOperationType | str stable_id: str payload: Mapping[str, Any] = field(default_factory=dict) expected_revision: int | None = None def __post_init__(self) -> None: object.__setattr__(self, "entity_type", coerce_entity_type(self.entity_type)) object.__setattr__(self, "operation", coerce_operation_type(self.operation)) object.__setattr__(self, "payload", dict(self.payload)) expected_revision = self.expected_revision if expected_revision is not None: try: expected_revision_int = int(expected_revision) except (TypeError, ValueError) as exc: raise ContentValidationError("expected_revision must be an integer") from exc if expected_revision_int < 0: raise ContentValidationError("expected_revision must not be negative") object.__setattr__(self, "expected_revision", expected_revision_int) @property def key(self) -> tuple[ContentEntityType, str]: return (self.entity_type, self.stable_id) def to_dict(self) -> dict[str, Any]: return { "entity_type": self.entity_type.value, "operation": self.operation.value, "stable_id": self.stable_id, "payload": dict(self.payload), "expected_revision": self.expected_revision, } @classmethod def from_mapping(cls, value: Mapping[str, Any]) -> "ContentChange": return cls( entity_type=value["entity_type"], operation=value["operation"], stable_id=str(value["stable_id"]), payload=dict(value.get("payload", {})), expected_revision=value.get("expected_revision"), ) @dataclass(frozen=True) class ContentChangeSet: change_set_id: str actor: AdminActor reason: str changes: tuple[ContentChange, ...] dry_run: bool = False created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def __post_init__(self) -> None: if not self.change_set_id.strip(): raise ContentValidationError("change_set_id must be non-empty") if not self.reason.strip(): raise ContentValidationError("reason must be non-empty") object.__setattr__(self, "changes", tuple(self.changes)) if self.created_at.tzinfo is None: raise ContentValidationError("created_at must be timezone-aware") def to_dict(self) -> dict[str, Any]: return { "change_set_id": self.change_set_id, "actor": self.actor.to_dict(), "reason": self.reason, "dry_run": self.dry_run, "created_at": self.created_at.isoformat(), "changes": [change.to_dict() for change in self.changes], } @classmethod def from_mapping(cls, value: Mapping[str, Any]) -> "ContentChangeSet": created_at_value = value.get("created_at") created_at = ( _parse_datetime(created_at_value) if created_at_value else datetime.now(timezone.utc) ) return cls( change_set_id=str(value.get("change_set_id", f"changeset-{uuid4().hex}")), actor=AdminActor.from_mapping(value["actor"]), reason=str(value["reason"]), dry_run=bool(value.get("dry_run", False)), created_at=created_at, changes=tuple(ContentChange.from_mapping(item) for item in value.get("changes", ())), ) @dataclass(frozen=True) class ChangeIssue: severity: IssueSeverity code: str message: str index: int | None = None entity_type: ContentEntityType | None = None stable_id: str | None = None def to_dict(self) -> dict[str, Any]: return { "severity": self.severity.value, "code": self.code, "message": self.message, "index": self.index, "entity_type": self.entity_type.value if self.entity_type else None, "stable_id": self.stable_id, } @dataclass(frozen=True) class ChangePlan: accepted: bool fingerprint: str summary: Mapping[str, int] issues: tuple[ChangeIssue, ...] @property def errors(self) -> tuple[ChangeIssue, ...]: return tuple(issue for issue in self.issues if issue.severity == IssueSeverity.ERROR) @property def warnings(self) -> tuple[ChangeIssue, ...]: return tuple(issue for issue in self.issues if issue.severity == IssueSeverity.WARNING) def to_dict(self) -> dict[str, Any]: return { "accepted": self.accepted, "fingerprint": self.fingerprint, "summary": dict(self.summary), "issues": [issue.to_dict() for issue in self.issues], } @dataclass(frozen=True) class ApplyResult: applied: bool audit_id: str plan: ChangePlan revisions: Mapping[str, int] def to_dict(self) -> dict[str, Any]: return { "applied": self.applied, "audit_id": self.audit_id, "plan": self.plan.to_dict(), "revisions": dict(self.revisions), } class ContentRepository(Protocol): """Persistence boundary for admin content updates.""" def get_revision(self, entity_type: ContentEntityType, stable_id: str) -> int | None: ... def exists(self, entity_type: ContentEntityType, stable_id: str) -> bool: ... def apply_content_change(self, change: ContentChange) -> int: """Apply a single change and return the new entity revision.""" def record_audit_event( self, change_set: ContentChangeSet, plan: ChangePlan, *, applied: bool, revisions: Mapping[str, int], ) -> str: ... class InMemoryContentRepository: """Deterministic repository for tests, demos, and admin tooling dry-runs.""" def __init__(self) -> None: self._records: dict[tuple[ContentEntityType, str], dict[str, Any]] = {} self._audit_events: dict[str, dict[str, Any]] = {} def seed( self, entity_type: ContentEntityType | str, stable_id: str, payload: Mapping[str, Any], *, revision: int = 1, archived: bool = False, ) -> None: entity = coerce_entity_type(entity_type) if revision < 1: raise ContentValidationError("seed revision must be >= 1") self._records[(entity, stable_id)] = { "payload": dict(payload), "revision": revision, "archived": archived, } def get_revision(self, entity_type: ContentEntityType, stable_id: str) -> int | None: record = self._records.get((entity_type, stable_id)) return None if record is None else int(record["revision"]) def exists(self, entity_type: ContentEntityType, stable_id: str) -> bool: return (entity_type, stable_id) in self._records def get_payload(self, entity_type: ContentEntityType | str, stable_id: str) -> Mapping[str, Any] | None: entity = coerce_entity_type(entity_type) record = self._records.get((entity, stable_id)) if record is None: return None return dict(record["payload"]) def is_archived(self, entity_type: ContentEntityType | str, stable_id: str) -> bool: entity = coerce_entity_type(entity_type) record = self._records.get((entity, stable_id)) return bool(record and record["archived"]) def apply_content_change(self, change: ContentChange) -> int: entity_type = coerce_entity_type(change.entity_type) operation = coerce_operation_type(change.operation) key = (entity_type, change.stable_id) existing = self._records.get(key) current_revision = 0 if existing is None else int(existing["revision"]) next_revision = current_revision + 1 if operation == ContentOperationType.DELETE: self._records.pop(key, None) return next_revision if operation == ContentOperationType.ARCHIVE: payload = dict(existing["payload"]) if existing else {} self._records[key] = { "payload": payload, "revision": next_revision, "archived": True, } return next_revision self._records[key] = { "payload": dict(change.payload), "revision": next_revision, "archived": False, } return next_revision def record_audit_event( self, change_set: ContentChangeSet, plan: ChangePlan, *, applied: bool, revisions: Mapping[str, int], ) -> str: audit_id = f"audit-{uuid4().hex}" self._audit_events[audit_id] = { "audit_id": audit_id, "change_set": change_set.to_dict(), "plan": plan.to_dict(), "applied": applied, "revisions": dict(revisions), "recorded_at": datetime.now(timezone.utc).isoformat(), } return audit_id def audit_event(self, audit_id: str) -> Mapping[str, Any] | None: event = self._audit_events.get(audit_id) return None if event is None else dict(event) class AdminContentService: """Validate, plan, and apply admin content changes.""" def __init__(self, repository: ContentRepository, *, max_changes: int = 500) -> None: if max_changes < 1: raise ContentValidationError("max_changes must be positive") self.repository = repository self.max_changes = max_changes def plan(self, change_set: ContentChangeSet) -> ChangePlan: issues: list[ChangeIssue] = [] counter: Counter[str] = Counter() seen_keys: set[tuple[ContentEntityType, str]] = set() planned_upserts = { change.key for change in change_set.changes if coerce_operation_type(change.operation) == ContentOperationType.UPSERT } if len(change_set.reason.strip()) < 8: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="reason_too_short", message="change set reason must be at least 8 characters", ) ) if not change_set.actor.has_any_permission(WRITE_PERMISSIONS): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="missing_write_permission", message="actor must have content:write or content:admin permission", ) ) if not change_set.changes: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="empty_change_set", message="change set must contain at least one change", ) ) if len(change_set.changes) > self.max_changes: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="too_many_changes", message=f"change set contains more than the maximum {self.max_changes} changes", ) ) for index, change in enumerate(change_set.changes): entity_type = coerce_entity_type(change.entity_type) operation = coerce_operation_type(change.operation) counter[f"{entity_type.value}.{operation.value}"] += 1 if not _STABLE_ID_RE.match(change.stable_id): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="invalid_stable_id", message=f"stable_id must match {_STABLE_ID_RE.pattern}", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if change.key in seen_keys: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="duplicate_entity_change", message="a change set may only modify each entity once", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) seen_keys.add(change.key) if operation in {ContentOperationType.DELETE, ContentOperationType.ARCHIVE}: if not change_set.actor.has_any_permission(DELETE_PERMISSIONS): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="missing_delete_permission", message="archive/delete operations require content:delete or content:admin permission", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if not self.repository.exists(entity_type, change.stable_id): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="entity_not_found", message="archive/delete operations require an existing entity", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) current_revision = self.repository.get_revision(entity_type, change.stable_id) if change.expected_revision is not None and change.expected_revision != current_revision: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="revision_conflict", message=( f"expected revision {change.expected_revision}, " f"but current revision is {current_revision}" ), index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if operation == ContentOperationType.UPSERT: issues.extend(self._validate_upsert_payload(change, index, planned_upserts)) if operation in {ContentOperationType.DELETE, ContentOperationType.ARCHIVE} and change.payload: issues.append( ChangeIssue( severity=IssueSeverity.WARNING, code="payload_ignored", message="payload is ignored for archive/delete operations", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) fingerprint = fingerprint_change_set(change_set) accepted = not any(issue.severity == IssueSeverity.ERROR for issue in issues) return ChangePlan( accepted=accepted, fingerprint=fingerprint, summary=dict(sorted(counter.items())), issues=tuple(issues), ) def apply(self, change_set: ContentChangeSet) -> ApplyResult: plan = self.plan(change_set) if not plan.accepted: raise ContentValidationError( "content change set rejected: " + "; ".join(f"{issue.code}: {issue.message}" for issue in plan.errors) ) revisions: MutableMapping[str, int] = {} if not change_set.dry_run: for change in change_set.changes: revision = self.repository.apply_content_change(change) revisions[f"{coerce_entity_type(change.entity_type).value}:{change.stable_id}"] = revision audit_id = self.repository.record_audit_event( change_set, plan, applied=not change_set.dry_run, revisions=revisions, ) return ApplyResult( applied=not change_set.dry_run, audit_id=audit_id, plan=plan, revisions=dict(revisions), ) def _validate_upsert_payload( self, change: ContentChange, index: int, planned_upserts: set[tuple[ContentEntityType, str]], ) -> tuple[ChangeIssue, ...]: entity_type = coerce_entity_type(change.entity_type) payload = dict(change.payload) issues: list[ChangeIssue] = [] if not payload: return ( ChangeIssue( severity=IssueSeverity.ERROR, code="empty_payload", message="upsert operations require a payload", index=index, entity_type=entity_type, stable_id=change.stable_id, ), ) for field_name in _required_fields_for(entity_type): if _missing(payload.get(field_name)): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="missing_required_field", message=f"{entity_type.value} payload requires '{field_name}'", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if entity_type == ContentEntityType.MATCH: home_team_id = _optional_text(payload.get("home_team_id")) away_team_id = _optional_text(payload.get("away_team_id")) stadium_id = _optional_text(payload.get("stadium_id")) if home_team_id and away_team_id and home_team_id == away_team_id: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="match_same_team", message="match home_team_id and away_team_id must be different", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) issues.extend( self._reference_issues( index=index, source_entity=entity_type, source_id=change.stable_id, field_name="home_team_id", target_entity=ContentEntityType.TEAM, target_id=home_team_id, planned_upserts=planned_upserts, ) ) issues.extend( self._reference_issues( index=index, source_entity=entity_type, source_id=change.stable_id, field_name="away_team_id", target_entity=ContentEntityType.TEAM, target_id=away_team_id, planned_upserts=planned_upserts, ) ) if stadium_id: issues.extend( self._reference_issues( index=index, source_entity=entity_type, source_id=change.stable_id, field_name="stadium_id", target_entity=ContentEntityType.STADIUM, target_id=stadium_id, planned_upserts=planned_upserts, ) ) if entity_type == ContentEntityType.QUIZ_QUESTION: options = payload.get("options") if not isinstance(options, list) or len(options) < 2: issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="quiz_options_invalid", message="quiz_question payload requires at least two options", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if _missing(payload.get("correct_option_id")) and _missing(payload.get("answer")): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="quiz_answer_missing", message="quiz_question requires correct_option_id or answer", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if entity_type == ContentEntityType.CHALLENGE: if _missing(payload.get("criteria")) and _missing(payload.get("rule")): issues.append( ChangeIssue( severity=IssueSeverity.ERROR, code="challenge_rule_missing", message="challenge requires criteria or rule", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) if entity_type == ContentEntityType.BADGE: if _missing(payload.get("description")) and _missing(payload.get("criteria")): issues.append( ChangeIssue( severity=IssueSeverity.WARNING, code="badge_context_missing", message="badge should include description or criteria for admin clarity", index=index, entity_type=entity_type, stable_id=change.stable_id, ) ) return tuple(issues) def _reference_issues( self, *, index: int, source_entity: ContentEntityType, source_id: str, field_name: str, target_entity: ContentEntityType, target_id: str | None, planned_upserts: set[tuple[ContentEntityType, str]], ) -> tuple[ChangeIssue, ...]: if not target_id: return () if self.repository.exists(target_entity, target_id) or (target_entity, target_id) in planned_upserts: return () return ( ChangeIssue( severity=IssueSeverity.ERROR, code="missing_reference", message=f"{field_name} references unknown {target_entity.value}:{target_id}", index=index, entity_type=source_entity, stable_id=source_id, ), ) def coerce_entity_type(value: ContentEntityType | str) -> ContentEntityType: if isinstance(value, ContentEntityType): return value try: return ContentEntityType(str(value)) except ValueError as exc: allowed = ", ".join(item.value for item in ContentEntityType) raise ContentValidationError(f"invalid content entity type {value!r}; allowed values: {allowed}") from exc def coerce_operation_type(value: ContentOperationType | str) -> ContentOperationType: if isinstance(value, ContentOperationType): return value try: return ContentOperationType(str(value)) except ValueError as exc: allowed = ", ".join(item.value for item in ContentOperationType) raise ContentValidationError(f"invalid content operation type {value!r}; allowed values: {allowed}") from exc def fingerprint_change_set(change_set: ContentChangeSet) -> str: canonical = { "change_set_id": change_set.change_set_id, "actor_id": change_set.actor.actor_id, "reason": change_set.reason, "dry_run": change_set.dry_run, "changes": [change.to_dict() for change in change_set.changes], } payload = json.dumps(canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=False) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def load_change_set(path: str | Path) -> ContentChangeSet: with Path(path).open("r", encoding="utf-8") as handle: return ContentChangeSet.from_mapping(json.load(handle)) def dump_change_set(change_set: ContentChangeSet, path: str | Path) -> None: with Path(path).open("w", encoding="utf-8") as handle: json.dump(change_set.to_dict(), handle, indent=2, sort_keys=True) handle.write("\n") def _required_fields_for(entity_type: ContentEntityType) -> tuple[str, ...]: return { ContentEntityType.COMPETITION: ("name", "season_label"), ContentEntityType.TEAM: ("name",), ContentEntityType.STADIUM: ("name", "city"), ContentEntityType.MATCH: ("home_team_id", "away_team_id", "kickoff_at"), ContentEntityType.QUIZ_QUESTION: ("prompt", "options"), ContentEntityType.CHALLENGE: ("title",), ContentEntityType.BADGE: ("name",), ContentEntityType.STICKER: ("name",), ContentEntityType.TRIVIA_PACK: ("title", "question_ids"), }[entity_type] def _missing(value: Any) -> bool: if value is None: return True if isinstance(value, str): return not value.strip() if isinstance(value, (list, tuple, dict, set)): return len(value) == 0 return False def _optional_text(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text or None def _parse_datetime(value: Any) -> datetime: if isinstance(value, datetime): parsed = value else: raw = str(value) if raw.endswith("Z"): raw = f"{raw[:-1]}+00:00" parsed = datetime.fromisoformat(raw) if parsed.tzinfo is None: raise ContentValidationError("datetime values must be timezone-aware") return parsed.astimezone(timezone.utc) __all__ = [ "AdminActor", "AdminContentService", "ApplyResult", "ChangeIssue", "ChangePlan", "ContentChange", "ContentChangeSet", "ContentEntityType", "ContentOperationType", "ContentRepository", "ContentValidationError", "DELETE_PERMISSIONS", "InMemoryContentRepository", "IssueSeverity", "WRITE_PERMISSIONS", "coerce_entity_type", "coerce_operation_type", "dump_change_set", "fingerprint_change_set", "load_change_set", ]