from __future__ import annotations from dataclasses import asdict, dataclass, field from datetime import date, datetime, timedelta, timezone from typing import Any from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session from fan_passport import models from fan_passport.errors import ConflictError, NotFoundError, ValidationError from fan_passport.scoring import ( GIANT_KILLING_BONUS_POINTS, SUPPORTED_COLLECTION_TYPES, collection_points, outcome_from_score, quiz_points, score_prediction, ) @dataclass class BadgeUnlock: badge: models.Badge user_badge: models.UserBadge @dataclass class ChallengeCompletion: challenge: models.Challenge progress: models.UserChallengeProgress @dataclass class EvaluationSummary: new_badges: list[BadgeUnlock] = field(default_factory=list) completed_challenges: list[ChallengeCompletion] = field(default_factory=list) points_awarded: int = 0 @dataclass class ProfileResult: profile: models.UserProfile created: bool evaluation: EvaluationSummary @dataclass class CollectionResult: item: models.UserCollectionItem duplicate: bool points_awarded: int profile: models.UserProfile evaluation: EvaluationSummary @dataclass class QuizAnswerResult: answer: models.QuizAnswer question: models.QuizQuestion points_awarded: int profile: models.UserProfile evaluation: EvaluationSummary @dataclass class PredictionSubmitResult: prediction: models.UserPrediction profile: models.UserProfile @dataclass class MatchEvaluationResult: match: models.Match evaluated_predictions: int points_awarded_total: int evaluation: EvaluationSummary @dataclass class PassportSnapshot: profile: models.UserProfile collection_counts: dict[str, int] content_totals: dict[str, int] collection_completion: dict[str, float] quiz: dict[str, int] predictions: dict[str, int] challenges: list[tuple[models.UserChallengeProgress, models.Challenge]] badges: list[tuple[models.UserBadge, models.Badge]] recent_events: list[models.AchievementEvent] def _utcnow() -> datetime: return datetime.now(timezone.utc) def _as_utc(value: datetime | None) -> datetime | None: if value is None: return None if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) def _normalize_option(value: str) -> str: return value.strip().lower() class GamificationService: def __init__(self, session: Session): self.session = session def create_profile( self, *, display_name: str, external_subject: str | None = None, country_code: str | None = None, favorite_team_id: str | None = None, avatar_url: str | None = None, metadata_json: dict[str, Any] | None = None, ) -> ProfileResult: if external_subject: existing = self.session.scalar( select(models.UserProfile).where( models.UserProfile.external_subject == external_subject ) ) if existing: raise ConflictError( "A profile already exists for this external subject.", details={"external_subject": external_subject}, ) if favorite_team_id: self._require_team(favorite_team_id) profile = models.UserProfile( external_subject=external_subject, display_name=display_name, country_code=country_code, favorite_team_id=favorite_team_id, avatar_url=avatar_url, metadata_json=metadata_json or {}, ) self.session.add(profile) self.session.flush() self._record_event( profile.id, "profile_created", 0, {"external_subject": external_subject, "display_name": display_name}, ) evaluation = self.evaluate_user(profile.id) self.session.commit() return ProfileResult(profile=profile, created=True, evaluation=evaluation) def get_or_create_profile_for_subject( self, *, external_subject: str, display_name: str, country_code: str | None = None, favorite_team_id: str | None = None, avatar_url: str | None = None, metadata_json: dict[str, Any] | None = None, ) -> ProfileResult: profile = self.session.scalar( select(models.UserProfile).where(models.UserProfile.external_subject == external_subject) ) created = profile is None if created: profile = models.UserProfile( external_subject=external_subject, display_name=display_name, country_code=country_code, favorite_team_id=favorite_team_id, avatar_url=avatar_url, metadata_json=metadata_json or {}, ) self.session.add(profile) self.session.flush() self._record_event(profile.id, "profile_created", 0, {"external_subject": external_subject}) else: if display_name: profile.display_name = display_name if country_code: profile.country_code = country_code if favorite_team_id: self._require_team(favorite_team_id) profile.favorite_team_id = favorite_team_id if avatar_url: profile.avatar_url = avatar_url if metadata_json: profile.metadata_json = {**(profile.metadata_json or {}), **metadata_json} evaluation = self.evaluate_user(profile.id) self.session.commit() return ProfileResult(profile=profile, created=created, evaluation=evaluation) def get_profile(self, user_id: str) -> models.UserProfile: return self._require_user(user_id) def collect_item( self, *, user_id: str, item_type: str, item_id: str, source: str = "manual", metadata_json: dict[str, Any] | None = None, ) -> CollectionResult: profile = self._require_user(user_id) item_type = item_type.lower() if item_type not in SUPPORTED_COLLECTION_TYPES: raise ValidationError( "Unsupported collection item type.", details={"item_type": item_type, "supported": sorted(SUPPORTED_COLLECTION_TYPES)}, ) self._validate_collection_item_exists(item_type=item_type, item_id=item_id) existing = self.session.scalar( select(models.UserCollectionItem).where( and_( models.UserCollectionItem.user_id == user_id, models.UserCollectionItem.item_type == item_type, models.UserCollectionItem.item_id == item_id, ) ) ) if existing: evaluation = self.evaluate_user(user_id) self.session.commit() return CollectionResult( item=existing, duplicate=True, points_awarded=evaluation.points_awarded, profile=profile, evaluation=evaluation, ) points = collection_points(item_type) item = models.UserCollectionItem( user_id=user_id, item_type=item_type, item_id=item_id, source=source, points_awarded=points, metadata_json=metadata_json or {}, ) self.session.add(item) self.session.flush() self._award_points( profile, category="collection", points=points, event_type="collection_item_acquired", payload={"item_type": item_type, "item_id": item_id, "source": source}, ) evaluation = self.evaluate_user(user_id) self.session.commit() return CollectionResult( item=item, duplicate=False, points_awarded=points + evaluation.points_awarded, profile=profile, evaluation=evaluation, ) def answer_quiz( self, *, user_id: str, question_id: str, selected_option: str, metadata_json: dict[str, Any] | None = None, ) -> QuizAnswerResult: profile = self._require_user(user_id) question = self.session.get(models.QuizQuestion, question_id) if question is None or not question.active: raise NotFoundError("Quiz question not found.", details={"question_id": question_id}) existing = self.session.scalar( select(models.QuizAnswer).where( and_( models.QuizAnswer.user_id == user_id, models.QuizAnswer.question_id == question_id, ) ) ) if existing: raise ConflictError( "This quiz question has already been answered by the user.", details={"question_id": question_id}, ) is_correct = _normalize_option(selected_option) == _normalize_option(question.correct_option) points = quiz_points(question.difficulty, is_correct) answer = models.QuizAnswer( user_id=user_id, question_id=question_id, selected_option=selected_option, is_correct=is_correct, points_awarded=points, metadata_json=metadata_json or {}, ) self.session.add(answer) self.session.flush() self._award_points( profile, category="quiz", points=points, event_type="quiz_answered", payload={ "question_id": question_id, "selected_option": selected_option, "is_correct": is_correct, "difficulty": question.difficulty, }, ) if points == 0: self._record_event( user_id, "quiz_answered", 0, {"question_id": question_id, "selected_option": selected_option, "is_correct": False}, ) evaluation = self.evaluate_user(user_id) self.session.commit() return QuizAnswerResult( answer=answer, question=question, points_awarded=points + evaluation.points_awarded, profile=profile, evaluation=evaluation, ) def submit_prediction( self, *, user_id: str, match_id: str, predicted_home_score: int, predicted_away_score: int, predicted_winner_team_id: str | None = None, giant_killing_pick: bool = False, ) -> PredictionSubmitResult: profile = self._require_user(user_id) match = self.session.get(models.Match, match_id) if match is None or not match.active: raise NotFoundError("Match not found.", details={"match_id": match_id}) if match.status not in {"scheduled", "postponed"}: raise ConflictError("Predictions are closed for this match.", details={"status": match.status}) kickoff_at = _as_utc(match.kickoff_at) if kickoff_at is None: raise ConflictError("Predictions are closed until the match kickoff is scheduled.") if kickoff_at <= _utcnow(): raise ConflictError("Predictions are locked after kickoff.", details={"kickoff_at": str(kickoff_at)}) expected_winner = self._winner_for_prediction(match, predicted_home_score, predicted_away_score) if predicted_winner_team_id and predicted_winner_team_id != expected_winner: raise ValidationError( "predicted_winner_team_id does not match the predicted scoreline.", details={"expected_winner_team_id": expected_winner}, ) existing = self.session.scalar( select(models.UserPrediction).where( and_(models.UserPrediction.user_id == user_id, models.UserPrediction.match_id == match_id) ) ) if existing and existing.evaluated_at is not None: raise ConflictError("An evaluated prediction cannot be changed.") if existing: prediction = existing prediction.predicted_home_score = predicted_home_score prediction.predicted_away_score = predicted_away_score prediction.predicted_winner_team_id = expected_winner prediction.giant_killing_pick = giant_killing_pick prediction.submitted_at = _utcnow() prediction.locked_at = kickoff_at prediction.score_breakdown_json = {} else: prediction = models.UserPrediction( user_id=user_id, match_id=match_id, predicted_home_score=predicted_home_score, predicted_away_score=predicted_away_score, predicted_winner_team_id=expected_winner, giant_killing_pick=giant_killing_pick, locked_at=kickoff_at, ) self.session.add(prediction) self.session.flush() self._record_event( user_id, "prediction_submitted", 0, { "match_id": match_id, "predicted_home_score": predicted_home_score, "predicted_away_score": predicted_away_score, "giant_killing_pick": giant_killing_pick, }, ) self.session.commit() return PredictionSubmitResult(prediction=prediction, profile=profile) def record_match_result( self, *, match_id: str, status: str, home_score: int, away_score: int, winner_team_id: str | None = None, is_giant_killing: bool = False, metadata_json: dict[str, Any] | None = None, ) -> MatchEvaluationResult: match = self.session.get(models.Match, match_id) if match is None: raise NotFoundError("Match not found.", details={"match_id": match_id}) if status != "final": raise ValidationError("Only final result evaluation is supported by the MVP engine.") derived_winner = self._winner_for_prediction(match, home_score, away_score) if winner_team_id and winner_team_id != derived_winner: raise ValidationError( "winner_team_id does not match the final scoreline.", details={"expected_winner_team_id": derived_winner}, ) match.status = status match.home_score = home_score match.away_score = away_score match.winner_team_id = derived_winner match.is_giant_killing = is_giant_killing if metadata_json: match.metadata_json = {**(match.metadata_json or {}), **metadata_json} predictions = self.session.scalars( select(models.UserPrediction).where( and_( models.UserPrediction.match_id == match_id, models.UserPrediction.evaluated_at.is_(None), ) ) ).all() total_points = 0 evaluation = EvaluationSummary() affected_user_ids: set[str] = set() for prediction in predictions: breakdown = score_prediction( predicted_home_score=prediction.predicted_home_score, predicted_away_score=prediction.predicted_away_score, giant_killing_pick=prediction.giant_killing_pick, actual_home_score=home_score, actual_away_score=away_score, actual_giant_killing=is_giant_killing, ) prediction.points_awarded = breakdown.total prediction.score_breakdown_json = asdict(breakdown) prediction.evaluated_at = _utcnow() profile = self._require_user(prediction.user_id) self._award_points( profile, category="prediction", points=breakdown.total, event_type="prediction_scored", payload={"match_id": match_id, "breakdown": asdict(breakdown)}, ) total_points += breakdown.total affected_user_ids.add(prediction.user_id) for affected_user_id in affected_user_ids: user_eval = self.evaluate_user(affected_user_id) evaluation.points_awarded += user_eval.points_awarded evaluation.new_badges.extend(user_eval.new_badges) evaluation.completed_challenges.extend(user_eval.completed_challenges) self.session.commit() return MatchEvaluationResult( match=match, evaluated_predictions=len(predictions), points_awarded_total=total_points + evaluation.points_awarded, evaluation=evaluation, ) def evaluate_user(self, user_id: str) -> EvaluationSummary: profile = self._require_user(user_id) summary = EvaluationSummary() challenges = self.session.scalars( select(models.Challenge).where(models.Challenge.active.is_(True)).order_by(models.Challenge.slug) ).all() for challenge in challenges: progress_count, target_count, state = self._evaluate_rule_progress( user_id, challenge.rules_json ) progress = self.session.scalar( select(models.UserChallengeProgress).where( and_( models.UserChallengeProgress.user_id == user_id, models.UserChallengeProgress.challenge_id == challenge.id, ) ) ) if progress is None: progress = models.UserChallengeProgress( user_id=user_id, challenge_id=challenge.id, target_count=max(target_count, 1), ) self.session.add(progress) self.session.flush() progress.progress_count = progress_count progress.target_count = max(target_count, 1) progress.state_json = state newly_completed = ( not progress.completed and target_count > 0 and progress_count >= target_count ) if newly_completed: progress.completed = True progress.completed_at = _utcnow() progress.points_awarded = challenge.points self._award_points( profile, category="challenge", points=challenge.points, event_type="challenge_completed", payload={"challenge_id": challenge.id, "slug": challenge.slug}, summary=summary, ) summary.completed_challenges.append( ChallengeCompletion(challenge=challenge, progress=progress) ) # Badge criteria may depend on points and challenge completions; loop to allow # one unlock's points to trigger a points_total badge during the same action. for _ in range(3): unlocked_this_pass = False badges = self.session.scalars( select(models.Badge).where(models.Badge.active.is_(True)).order_by(models.Badge.slug) ).all() for badge in badges: existing_badge = self.session.scalar( select(models.UserBadge).where( and_( models.UserBadge.user_id == user_id, models.UserBadge.badge_id == badge.id, ) ) ) if existing_badge: continue progress_count, target_count, state = self._evaluate_rule_progress( user_id, badge.criteria_json ) if target_count > 0 and progress_count >= target_count: user_badge = models.UserBadge( user_id=user_id, badge_id=badge.id, reason_json={ "criteria": badge.criteria_json, "progress_count": progress_count, "target_count": target_count, "state": state, }, ) self.session.add(user_badge) self.session.flush() self._award_points( profile, category="achievement", points=badge.points, event_type="badge_unlocked", payload={"badge_id": badge.id, "slug": badge.slug}, summary=summary, ) summary.new_badges.append(BadgeUnlock(badge=badge, user_badge=user_badge)) unlocked_this_pass = True if not unlocked_this_pass: break self.session.flush() return summary def evaluate_user_and_commit(self, user_id: str) -> EvaluationSummary: summary = self.evaluate_user(user_id) self.session.commit() return summary def get_passport_snapshot(self, user_id: str) -> PassportSnapshot: self._require_user(user_id) self.evaluate_user(user_id) self.session.commit() profile = self._require_user(user_id) collection_counts = self._collection_counts(user_id) content_totals = self._content_totals() completion = { item_type: round((collection_counts.get(item_type, 0) / total) * 100, 2) if total else 0.0 for item_type, total in content_totals.items() } answered = self.session.scalar( select(func.count(models.QuizAnswer.id)).where(models.QuizAnswer.user_id == user_id) ) or 0 correct = self.session.scalar( select(func.count(models.QuizAnswer.id)).where( and_(models.QuizAnswer.user_id == user_id, models.QuizAnswer.is_correct.is_(True)) ) ) or 0 predictions_submitted = self.session.scalar( select(func.count(models.UserPrediction.id)).where(models.UserPrediction.user_id == user_id) ) or 0 predictions_evaluated = self.session.scalar( select(func.count(models.UserPrediction.id)).where( and_( models.UserPrediction.user_id == user_id, models.UserPrediction.evaluated_at.is_not(None), ) ) ) or 0 prediction_points = self.session.scalar( select(func.coalesce(func.sum(models.UserPrediction.points_awarded), 0)).where( models.UserPrediction.user_id == user_id ) ) or 0 challenge_rows = self.session.execute( select(models.UserChallengeProgress, models.Challenge) .join(models.Challenge, models.Challenge.id == models.UserChallengeProgress.challenge_id) .where(models.UserChallengeProgress.user_id == user_id) .order_by(models.UserChallengeProgress.completed.desc(), models.Challenge.slug) ).all() badge_rows = self.session.execute( select(models.UserBadge, models.Badge) .join(models.Badge, models.Badge.id == models.UserBadge.badge_id) .where(models.UserBadge.user_id == user_id) .order_by(models.UserBadge.unlocked_at.desc()) ).all() recent_events = self.session.scalars( select(models.AchievementEvent) .where(models.AchievementEvent.user_id == user_id) .order_by(models.AchievementEvent.created_at.desc()) .limit(20) ).all() return PassportSnapshot( profile=profile, collection_counts=collection_counts, content_totals=content_totals, collection_completion=completion, quiz={"answered": int(answered), "correct": int(correct)}, predictions={ "submitted": int(predictions_submitted), "evaluated": int(predictions_evaluated), "points": int(prediction_points), }, challenges=[(row[0], row[1]) for row in challenge_rows], badges=[(row[0], row[1]) for row in badge_rows], recent_events=recent_events, ) def list_collection_items( self, *, user_id: str, item_type: str | None = None ) -> list[models.UserCollectionItem]: self._require_user(user_id) stmt = select(models.UserCollectionItem).where(models.UserCollectionItem.user_id == user_id) if item_type: stmt = stmt.where(models.UserCollectionItem.item_type == item_type.lower()) return self.session.scalars(stmt.order_by(models.UserCollectionItem.acquired_at.desc())).all() def list_predictions(self, *, user_id: str) -> list[models.UserPrediction]: self._require_user(user_id) return self.session.scalars( select(models.UserPrediction) .where(models.UserPrediction.user_id == user_id) .order_by(models.UserPrediction.submitted_at.desc()) ).all() def list_achievements( self, *, user_id: str ) -> tuple[list[tuple[models.UserChallengeProgress, models.Challenge]], list[tuple[models.UserBadge, models.Badge]]]: snapshot = self.get_passport_snapshot(user_id) return snapshot.challenges, snapshot.badges def _require_user(self, user_id: str) -> models.UserProfile: profile = self.session.get(models.UserProfile, user_id) if profile is None: raise NotFoundError("User profile not found.", details={"user_id": user_id}) return profile def _require_team(self, team_id: str) -> models.Team: team = self.session.get(models.Team, team_id) if team is None or not team.active: raise NotFoundError("Team not found.", details={"team_id": team_id}) return team def _validate_collection_item_exists(self, *, item_type: str, item_id: str) -> None: if item_type == "team": self._require_team(item_id) elif item_type == "stadium": stadium = self.session.get(models.Stadium, item_id) if stadium is None or not stadium.active: raise NotFoundError("Stadium not found.", details={"stadium_id": item_id}) elif item_type == "match": match = self.session.get(models.Match, item_id) if match is None or not match.active: raise NotFoundError("Match not found.", details={"match_id": item_id}) elif item_type == "sticker": sticker = self.session.get(models.Sticker, item_id) if sticker is None or not sticker.active: raise NotFoundError("Sticker not found.", details={"sticker_id": item_id}) elif item_type == "memory": return else: raise ValidationError("Unsupported collection item type.", details={"item_type": item_type}) def _winner_for_prediction( self, match: models.Match, home_score: int, away_score: int ) -> str | None: outcome = outcome_from_score(home_score, away_score) if outcome == "home": return match.home_team_id if outcome == "away": return match.away_team_id return None def _award_points( self, profile: models.UserProfile, *, category: str, points: int, event_type: str, payload: dict[str, Any], summary: EvaluationSummary | None = None, ) -> None: if points <= 0: return profile.points_total += points if category == "collection": profile.collection_points += points elif category == "quiz": profile.quiz_points += points elif category == "prediction": profile.prediction_points += points elif category == "challenge": profile.challenge_points += points elif category == "achievement": profile.achievement_points += points else: raise ValidationError("Unknown point category.", details={"category": category}) self._record_event(profile.id, event_type, points, payload) if summary is not None: summary.points_awarded += points def _record_event( self, user_id: str, event_type: str, points_delta: int, payload: dict[str, Any] ) -> None: self.session.add( models.AchievementEvent( user_id=user_id, event_type=event_type, points_delta=points_delta, payload_json=payload, ) ) def _collection_counts(self, user_id: str) -> dict[str, int]: rows = self.session.execute( select(models.UserCollectionItem.item_type, func.count(models.UserCollectionItem.id)) .where(models.UserCollectionItem.user_id == user_id) .group_by(models.UserCollectionItem.item_type) ).all() return {str(item_type): int(count) for item_type, count in rows} def _content_totals(self) -> dict[str, int]: return { "team": int( self.session.scalar(select(func.count(models.Team.id)).where(models.Team.active.is_(True))) or 0 ), "stadium": int( self.session.scalar( select(func.count(models.Stadium.id)).where(models.Stadium.active.is_(True)) ) or 0 ), "match": int( self.session.scalar(select(func.count(models.Match.id)).where(models.Match.active.is_(True))) or 0 ), "sticker": int( self.session.scalar( select(func.count(models.Sticker.id)).where(models.Sticker.active.is_(True)) ) or 0 ), "memory": 0, } def _evaluate_rule_progress( self, user_id: str, rule: dict[str, Any] | None ) -> tuple[int, int, dict[str, Any]]: rule = rule or {} kind = rule.get("type") or rule.get("kind") if kind == "profile_created": return 1, int(rule.get("target", 1)), {"type": kind} if kind == "collect_count": item_type = str(rule.get("item_type", "")).lower() target = int(rule.get("target", 1)) count = self._count_collected(user_id, item_type) return count, target, {"type": kind, "item_type": item_type} if kind == "collect_all": item_type = str(rule.get("item_type", "")).lower() target_ids = self._target_content_ids(item_type, rule) collected = self._collected_ids(user_id, item_type) return len(target_ids & collected), len(target_ids), { "type": kind, "item_type": item_type, "remaining_ids": sorted(target_ids - collected), } if kind == "collect_group": group_name = str(rule.get("group_name", "")) competition_code = rule.get("competition_code") target_ids = self._team_ids_for_group(group_name, competition_code) collected = self._collected_ids(user_id, "team") return len(target_ids & collected), len(target_ids), { "type": kind, "group_name": group_name, "remaining_ids": sorted(target_ids - collected), } if kind == "complete_groups": competition_code = rule.get("competition_code") group_names = rule.get("groups") or self._all_group_names(competition_code) collected = self._collected_ids(user_id, "team") complete_count = 0 group_state: dict[str, Any] = {} for group_name in group_names: target_ids = self._team_ids_for_group(str(group_name), competition_code) is_complete = bool(target_ids) and target_ids.issubset(collected) if is_complete: complete_count += 1 group_state[str(group_name)] = { "complete": is_complete, "remaining_ids": sorted(target_ids - collected), } target = int(rule.get("target", len(group_names))) return complete_count, target, {"type": kind, "groups": group_state} if kind == "watched_team_matches": team_id = str(rule.get("team_id", "")) match_ids = self._match_ids_for_team(team_id) collected = self._collected_ids(user_id, "match") target = int(rule.get("target", len(match_ids))) return len(match_ids & collected), target, { "type": kind, "team_id": team_id, "remaining_match_ids": sorted(match_ids - collected), } if kind == "quiz_correct_count": target = int(rule.get("target", 1)) count = self.session.scalar( select(func.count(models.QuizAnswer.id)).where( and_(models.QuizAnswer.user_id == user_id, models.QuizAnswer.is_correct.is_(True)) ) ) return int(count or 0), target, {"type": kind} if kind == "quiz_daily_streak": target = int(rule.get("target_days", rule.get("target", 1))) streak = self._quiz_correct_streak(user_id) return streak, target, {"type": kind, "max_streak": streak} if kind == "prediction_count": target = int(rule.get("target", 1)) stmt = select(func.count(models.UserPrediction.id)).where( models.UserPrediction.user_id == user_id ) if rule.get("evaluated_only"): stmt = stmt.where(models.UserPrediction.evaluated_at.is_not(None)) if rule.get("correct_only"): stmt = stmt.where(models.UserPrediction.points_awarded > 5) count = self.session.scalar(stmt) return int(count or 0), target, {"type": kind} if kind == "giant_killing_predictions": target = int(rule.get("target", 1)) count = self.session.scalar( select(func.count(models.UserPrediction.id)).where( and_( models.UserPrediction.user_id == user_id, models.UserPrediction.giant_killing_pick.is_(True), models.UserPrediction.points_awarded >= GIANT_KILLING_BONUS_POINTS, models.UserPrediction.evaluated_at.is_not(None), ) ) ) return int(count or 0), target, {"type": kind} if kind == "points_total": profile = self._require_user(user_id) return int(profile.points_total), int(rule.get("target", 1)), {"type": kind} if kind == "challenge_completions": target = int(rule.get("target", 1)) count = self.session.scalar( select(func.count(models.UserChallengeProgress.id)).where( and_( models.UserChallengeProgress.user_id == user_id, models.UserChallengeProgress.completed.is_(True), ) ) ) return int(count or 0), target, {"type": kind} if kind == "badge_count": target = int(rule.get("target", 1)) count = self.session.scalar( select(func.count(models.UserBadge.id)).where(models.UserBadge.user_id == user_id) ) return int(count or 0), target, {"type": kind} return 0, int(rule.get("target", 1)), {"type": kind or "unknown", "rule": rule} def _count_collected(self, user_id: str, item_type: str) -> int: count = self.session.scalar( select(func.count(models.UserCollectionItem.id)).where( and_( models.UserCollectionItem.user_id == user_id, models.UserCollectionItem.item_type == item_type, ) ) ) return int(count or 0) def _collected_ids(self, user_id: str, item_type: str) -> set[str]: return set( self.session.scalars( select(models.UserCollectionItem.item_id).where( and_( models.UserCollectionItem.user_id == user_id, models.UserCollectionItem.item_type == item_type, ) ) ).all() ) def _target_content_ids(self, item_type: str, rule: dict[str, Any]) -> set[str]: explicit = rule.get("ids") if explicit: return {str(value) for value in explicit} competition_code = rule.get("competition_code") if item_type == "team": stmt = select(models.Team.id).where(models.Team.active.is_(True)) if competition_code: stmt = stmt.where(models.Team.competition_code == competition_code) elif item_type == "stadium": stmt = select(models.Stadium.id).where(models.Stadium.active.is_(True)) if competition_code: stmt = stmt.where(models.Stadium.competition_code == competition_code) elif item_type == "match": stmt = select(models.Match.id).where(models.Match.active.is_(True)) if competition_code: stmt = stmt.where(models.Match.competition_code == competition_code) elif item_type == "sticker": stmt = select(models.Sticker.id).where(models.Sticker.active.is_(True)) if competition_code: stmt = stmt.where(models.Sticker.competition_code == competition_code) else: return set() return set(self.session.scalars(stmt).all()) def _team_ids_for_group(self, group_name: str, competition_code: str | None = None) -> set[str]: stmt = select(models.Team.id).where( and_(models.Team.group_name == group_name, models.Team.active.is_(True)) ) if competition_code: stmt = stmt.where(models.Team.competition_code == competition_code) return set(self.session.scalars(stmt).all()) def _all_group_names(self, competition_code: str | None = None) -> list[str]: stmt = select(models.Team.group_name).where( and_(models.Team.group_name.is_not(None), models.Team.active.is_(True)) ) if competition_code: stmt = stmt.where(models.Team.competition_code == competition_code) names = [name for name in self.session.scalars(stmt.distinct()).all() if name] return sorted(names) def _match_ids_for_team(self, team_id: str) -> set[str]: return set( self.session.scalars( select(models.Match.id).where( and_( models.Match.active.is_(True), or_(models.Match.home_team_id == team_id, models.Match.away_team_id == team_id), ) ) ).all() ) def _quiz_correct_streak(self, user_id: str) -> int: answered_dates = sorted( { value for value in self.session.scalars( select(models.QuizAnswer.answered_at).where( and_( models.QuizAnswer.user_id == user_id, models.QuizAnswer.is_correct.is_(True), ) ) ).all() if value is not None } ) unique_dates: list[date] = [] for dt in answered_dates: as_date = _as_utc(dt).date() if isinstance(dt, datetime) else dt if as_date not in unique_dates: unique_dates.append(as_date) max_streak = 0 current = 0 previous: date | None = None for answer_date in unique_dates: if previous is None or answer_date == previous + timedelta(days=1): current += 1 else: current = 1 max_streak = max(max_streak, current) previous = answer_date return max_streak