"""Moderation primitives for public Fan Passport surfaces. The MVP currently focuses on football passport progress, but production launch will also have user display names, profile bios, prediction notes, quiz comments, country/team affinity labels, and leaderboard-visible fields. This module provides a deterministic local policy engine that can be used immediately and also acts as the adapter boundary for human moderation queues or third-party trust-and-safety services later. No network calls are made here. The policy is intentionally conservative and auditable: each decision includes structured findings and a normalized safe text variant suitable for storing alongside the original user input. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum import re import unicodedata from typing import Iterable, Mapping, Protocol from uuid import uuid4 _ZERO_WIDTH_TRANSLATION = { ord("\u200b"): None, ord("\u200c"): None, ord("\u200d"): None, ord("\ufeff"): None, } _CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]") _WHITESPACE_RE = re.compile(r"\s+") _URL_RE = re.compile(r"(?i)\b(?:https?://|www\.)[^\s]+") _EMAIL_RE = re.compile(r"(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b") _PHONE_RE = re.compile(r"(? dict[str, object]: return { "code": self.code, "message": self.message, "severity": self.severity.value, "start": self.start, "end": self.end, "matched_text": self.matched_text, } @dataclass(frozen=True) class ModerationResult: status: ModerationStatus original_text: str safe_text: str context: str score: int findings: tuple[ModerationFinding, ...] evaluated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @property def approved(self) -> bool: return self.status == ModerationStatus.APPROVED def to_dict(self) -> dict[str, object]: return { "status": self.status.value, "original_text": self.original_text, "safe_text": self.safe_text, "context": self.context, "score": self.score, "findings": [finding.to_dict() for finding in self.findings], "evaluated_at": self.evaluated_at.isoformat(), } @dataclass(frozen=True) class ModerationPolicy: """Configurable rules for the deterministic local moderator.""" blocked_terms: tuple[str, ...] = ( "free crypto", "spamcoin", "match fixing service", "stolen tickets", "fake passport", ) review_terms: tuple[str, ...] = ( "official fifa", "cash prize", "ticket resale", "refund help", "giveaway", "betting tips", ) protected_role_terms: tuple[str, ...] = ( "admin", "moderator", "support", "fifa", "world cup official", ) trusted_roles: frozenset[str] = frozenset({"admin", "moderator", "support"}) min_display_name_length: int = 2 max_display_name_length: int = 32 max_public_text_length: int = 280 review_score_threshold: int = 4 reject_score_threshold: int = 10 reject_contact_info: bool = True reject_urls_in_display_names: bool = True def __post_init__(self) -> None: if self.min_display_name_length < 1: raise ValueError("min_display_name_length must be positive") if self.max_display_name_length < self.min_display_name_length: raise ValueError("max_display_name_length must be >= min_display_name_length") if self.max_public_text_length < self.max_display_name_length: raise ValueError("max_public_text_length must be >= max_display_name_length") if self.reject_score_threshold < self.review_score_threshold: raise ValueError("reject_score_threshold must be >= review_score_threshold") object.__setattr__(self, "blocked_terms", tuple(_normalise_for_matching(term) for term in self.blocked_terms)) object.__setattr__(self, "review_terms", tuple(_normalise_for_matching(term) for term in self.review_terms)) object.__setattr__( self, "protected_role_terms", tuple(_normalise_for_matching(term) for term in self.protected_role_terms), ) object.__setattr__(self, "trusted_roles", frozenset(role.lower() for role in self.trusted_roles)) @dataclass(frozen=True) class ModerationCase: case_id: str user_id: str context: str original_text: str safe_text: str status: ModerationStatus score: int findings: tuple[ModerationFinding, ...] created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) decided_at: datetime | None = None moderator_id: str | None = None decision_note: str | None = None @classmethod def from_result(cls, user_id: str, result: ModerationResult) -> "ModerationCase": return cls( case_id=f"mod-{uuid4().hex}", user_id=user_id, context=result.context, original_text=result.original_text, safe_text=result.safe_text, status=result.status, score=result.score, findings=result.findings, ) @dataclass(frozen=True) class Sanction: sanction_id: str user_id: str action: SanctionAction reason: str created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) expires_at: datetime | None = None moderator_id: str | None = None class ModerationRepository(Protocol): """Persistence boundary for moderation queues and sanctions.""" def create_case(self, case: ModerationCase) -> ModerationCase: ... def get_case(self, case_id: str) -> ModerationCase | None: ... def list_open_cases(self) -> tuple[ModerationCase, ...]: ... def record_decision( self, case_id: str, status: ModerationStatus, moderator_id: str, note: str, ) -> ModerationCase: ... def apply_sanction(self, sanction: Sanction) -> Sanction: ... class InMemoryModerationRepository: """Small repository implementation useful for tests and local development.""" def __init__(self) -> None: self._cases: dict[str, ModerationCase] = {} self._sanctions: dict[str, Sanction] = {} def create_case(self, case: ModerationCase) -> ModerationCase: if case.case_id in self._cases: raise ValueError(f"moderation case already exists: {case.case_id}") self._cases[case.case_id] = case return case def get_case(self, case_id: str) -> ModerationCase | None: return self._cases.get(case_id) def list_open_cases(self) -> tuple[ModerationCase, ...]: return tuple( case for case in self._cases.values() if case.status == ModerationStatus.NEEDS_REVIEW and case.decided_at is None ) def record_decision( self, case_id: str, status: ModerationStatus, moderator_id: str, note: str, ) -> ModerationCase: try: case = self._cases[case_id] except KeyError as exc: raise ValueError(f"unknown moderation case: {case_id}") from exc updated = ModerationCase( case_id=case.case_id, user_id=case.user_id, context=case.context, original_text=case.original_text, safe_text=case.safe_text, status=status, score=case.score, findings=case.findings, created_at=case.created_at, decided_at=datetime.now(timezone.utc), moderator_id=moderator_id, decision_note=note, ) self._cases[case_id] = updated return updated def apply_sanction(self, sanction: Sanction) -> Sanction: if sanction.sanction_id in self._sanctions: raise ValueError(f"sanction already exists: {sanction.sanction_id}") self._sanctions[sanction.sanction_id] = sanction return sanction def sanctions_for_user(self, user_id: str) -> tuple[Sanction, ...]: return tuple(sanction for sanction in self._sanctions.values() if sanction.user_id == user_id) class TextModerator: """Deterministic policy evaluator for user-submitted text.""" def __init__(self, policy: ModerationPolicy | None = None) -> None: self.policy = policy or ModerationPolicy() def screen_display_name(self, display_name: str, *, user_role: str = "user") -> ModerationResult: return self.screen_text(display_name, context="display_name", user_role=user_role) def screen_text( self, text: str, *, context: str = "public_text", user_role: str = "user", metadata: Mapping[str, object] | None = None, ) -> ModerationResult: """Evaluate a text value and return a structured moderation result.""" original = "" if text is None else str(text) safe_text = normalise_public_text(original) matching_text = _normalise_for_matching(safe_text) findings: list[ModerationFinding] = [] score = 0 metadata = metadata or {} if context == "display_name": length = len(safe_text) if length < self.policy.min_display_name_length: findings.append( ModerationFinding( code="display_name_too_short", message=f"display name must be at least {self.policy.min_display_name_length} characters", severity=FindingSeverity.ERROR, ) ) score += 10 if length > self.policy.max_display_name_length: findings.append( ModerationFinding( code="display_name_too_long", message=f"display name must be at most {self.policy.max_display_name_length} characters", severity=FindingSeverity.ERROR, ) ) score += 10 else: if len(safe_text) > self.policy.max_public_text_length: findings.append( ModerationFinding( code="text_too_long", message=f"text must be at most {self.policy.max_public_text_length} characters", severity=FindingSeverity.ERROR, ) ) score += 10 if not safe_text: findings.append( ModerationFinding( code="empty_text", message="text must not be empty", severity=FindingSeverity.ERROR, ) ) score += 10 if _CONTROL_CHAR_RE.search(original) or original.translate(_ZERO_WIDTH_TRANSLATION) != original: findings.append( ModerationFinding( code="hidden_characters_removed", message="hidden or control characters were removed", severity=FindingSeverity.WARNING, ) ) score += 1 score += self._add_term_findings( matching_text=matching_text, original_text=safe_text, terms=self.policy.blocked_terms, severity=FindingSeverity.ERROR, code="blocked_term", message="text contains a blocked phrase", score_weight=10, findings=findings, ) score += self._add_term_findings( matching_text=matching_text, original_text=safe_text, terms=self.policy.review_terms, severity=FindingSeverity.REVIEW, code="review_term", message="text contains a phrase that requires review", score_weight=4, findings=findings, ) role = user_role.lower() if role not in self.policy.trusted_roles: score += self._add_term_findings( matching_text=matching_text, original_text=safe_text, terms=self.policy.protected_role_terms, severity=FindingSeverity.ERROR if context == "display_name" else FindingSeverity.REVIEW, code="protected_role_impersonation", message="text appears to impersonate an official or moderator role", score_weight=10 if context == "display_name" else 4, findings=findings, ) url_match = _URL_RE.search(safe_text) if url_match: if context == "display_name" and self.policy.reject_urls_in_display_names: severity = FindingSeverity.ERROR score += 10 else: severity = FindingSeverity.REVIEW score += 4 findings.append( ModerationFinding( code="url_detected", message="public text contains a URL", severity=severity, start=url_match.start(), end=url_match.end(), matched_text=url_match.group(0), ) ) contact_match = _EMAIL_RE.search(safe_text) or _PHONE_RE.search(safe_text) if contact_match: severity = FindingSeverity.ERROR if self.policy.reject_contact_info else FindingSeverity.REVIEW score += 10 if self.policy.reject_contact_info else 4 findings.append( ModerationFinding( code="contact_information", message="public text contains contact information", severity=severity, start=contact_match.start(), end=contact_match.end(), matched_text=contact_match.group(0), ) ) repeated_match = _REPEATED_CHAR_RE.search(safe_text) if repeated_match: findings.append( ModerationFinding( code="excessive_repetition", message="text contains excessive repeated characters", severity=FindingSeverity.WARNING, start=repeated_match.start(), end=repeated_match.end(), matched_text=repeated_match.group(0), ) ) score += 1 if _has_excessive_caps(safe_text): findings.append( ModerationFinding( code="excessive_caps", message="text contains excessive capital letters", severity=FindingSeverity.WARNING, ) ) score += 1 for required_code in _metadata_required_flags(metadata): findings.append( ModerationFinding( code=required_code, message="metadata flag requires moderation review", severity=FindingSeverity.REVIEW, ) ) score += 4 status = self._status_from_score_and_findings(score, findings) return ModerationResult( status=status, original_text=original, safe_text=safe_text, context=context, score=score, findings=tuple(findings), ) def open_case_if_needed( self, repository: ModerationRepository, *, user_id: str, result: ModerationResult, ) -> ModerationCase | None: """Create a moderation queue case for review/rejected results.""" if result.status == ModerationStatus.APPROVED: return None return repository.create_case(ModerationCase.from_result(user_id=user_id, result=result)) def _add_term_findings( self, *, matching_text: str, original_text: str, terms: Iterable[str], severity: FindingSeverity, code: str, message: str, score_weight: int, findings: list[ModerationFinding], ) -> int: score = 0 for term in terms: if not term: continue pattern = re.compile(_NON_WORD_BOUNDARY.format(term=re.escape(term))) match = pattern.search(matching_text) if not match: continue findings.append( ModerationFinding( code=code, message=message, severity=severity, start=match.start(), end=match.end(), matched_text=original_text[match.start() : match.end()], ) ) score += score_weight return score def _status_from_score_and_findings( self, score: int, findings: Iterable[ModerationFinding], ) -> ModerationStatus: severities = tuple(finding.severity for finding in findings) if FindingSeverity.ERROR in severities or score >= self.policy.reject_score_threshold: return ModerationStatus.REJECTED if FindingSeverity.REVIEW in severities or score >= self.policy.review_score_threshold: return ModerationStatus.NEEDS_REVIEW return ModerationStatus.APPROVED def normalise_public_text(text: str) -> str: """Normalize user-visible text while preserving ordinary Unicode letters.""" normalized = unicodedata.normalize("NFKC", text).translate(_ZERO_WIDTH_TRANSLATION) normalized = _CONTROL_CHAR_RE.sub("", normalized) return _WHITESPACE_RE.sub(" ", normalized).strip() def recommended_sanction_for_case(case: ModerationCase) -> SanctionAction: """Map a moderation case to the default product action. Human moderators can override this, but deterministic recommendations keep admin tooling consistent and make automated tests predictable. """ if case.status == ModerationStatus.APPROVED: return SanctionAction.NONE codes = {finding.code for finding in case.findings} if "protected_role_impersonation" in codes: return SanctionAction.REQUIRE_PROFILE_EDIT if "blocked_term" in codes or "contact_information" in codes: return SanctionAction.HIDE_CONTENT if case.status == ModerationStatus.REJECTED: return SanctionAction.HIDE_CONTENT return SanctionAction.NONE def _normalise_for_matching(text: str) -> str: return normalise_public_text(text).casefold() def _has_excessive_caps(text: str) -> bool: letters = [char for char in text if char.isalpha()] if len(letters) < 10: return False uppercase_count = sum(1 for char in letters if char.isupper()) return uppercase_count / len(letters) >= 0.8 def _metadata_required_flags(metadata: Mapping[str, object]) -> tuple[str, ...]: flags: list[str] = [] if metadata.get("is_reported_by_user"): flags.append("user_reported") if metadata.get("contains_image_attachment"): flags.append("attachment_requires_review") return tuple(flags) __all__ = [ "FindingSeverity", "InMemoryModerationRepository", "ModerationCase", "ModerationFinding", "ModerationPolicy", "ModerationRepository", "ModerationResult", "ModerationStatus", "Sanction", "SanctionAction", "TextModerator", "normalise_public_text", "recommended_sanction_for_case", ]