"""Tournament orchestration: matchups x seeds -> traces -> findings -> exploit records. A tournament runs each matchup (a population composition) across many seeds, scores every episode, runs the detector suite over every trace, deduplicates findings into numbered :class:`ExploitRecord` values, and attaches a deterministic *reproduction* (seed + perpetrator action script) to each record. The reproduction is what :mod:`fable_selfplay.exploit_to_test` later compiles into a permanent regression test. Environment touchpoints are isolated in the small adapter functions at the top of this module (``load_kernel``, ``build_environment``, ``_state_of``, ``_collect_events``) so any drift in the simulation API is fixed in exactly one place. """ from __future__ import annotations import json import random import statistics from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any, Mapping, Sequence from .agents import Agent, make_population from .detectors import DetectorSuite, ExploitFinding, default_suite from .environment import Environment from .kernel import Kernel from .metrics import EpisodeScore, EpisodeTrace, EventRecord, TurnSnapshot, score_episode __all__ = [ "MatchupSpec", "TournamentConfig", "EpisodeResult", "ExploitRecord", "TournamentResult", "Tournament", "run_episode", "load_kernel", "DEFAULT_MATCHUPS", ] # --------------------------------------------------------------------------- # Environment adapters (single point of contact with the sim API) # --------------------------------------------------------------------------- def load_kernel(path: str | Path) -> Any: """Load a kernel document, tolerating either classmethod name.""" if hasattr(Kernel, "load"): return Kernel.load(path) if hasattr(Kernel, "from_file"): return Kernel.from_file(path) # type: ignore[attr-defined] raise AttributeError("Kernel exposes neither load() nor from_file()") def build_environment(kernel: Any, citizen_ids: Sequence[str], initial_treasury: float, seed: int) -> Any: return Environment( kernel=kernel, citizen_ids=list(citizen_ids), initial_treasury=initial_treasury, seed=seed, ) def _state_of(step_result: Any, env: Any) -> Any: if step_result is None: return getattr(env, "state") return getattr(step_result, "state", step_result) def _collect_events(env: Any, state: Any) -> list[Any]: for candidate in (getattr(state, "log", None), getattr(env, "events", None), getattr(state, "events", None)): if candidate is None: continue events = getattr(candidate, "events", candidate) try: return list(events) except TypeError: continue return [] def _kernel_version(kernel: Any) -> str: return str(getattr(kernel, "version", getattr(kernel, "semver", "0.1.0"))) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- @dataclass(frozen=True) class MatchupSpec: name: str honest: int = 4 drainers: int = 0 entrenchers: int = 0 suppressors: int = 0 def build_agents(self) -> list[Agent]: return make_population( honest=self.honest, drainers=self.drainers, entrenchers=self.entrenchers, suppressors=self.suppressors, ) DEFAULT_MATCHUPS: tuple[MatchupSpec, ...] = ( MatchupSpec("baseline-honest", honest=6), MatchupSpec("drain", honest=5, drainers=2), MatchupSpec("entrench", honest=5, entrenchers=2), MatchupSpec("suppress", honest=5, suppressors=2), MatchupSpec("mixed-capture", honest=5, drainers=1, entrenchers=1, suppressors=1), ) @dataclass class TournamentConfig: kernel_path: str | Path seeds: Sequence[int] = tuple(range(8)) n_turns: int = 60 initial_treasury: float = 1000.0 matchups: Sequence[MatchupSpec] = DEFAULT_MATCHUPS out_dir: str | Path | None = None exploit_id_start: int = 1 # --------------------------------------------------------------------------- # Results # --------------------------------------------------------------------------- @dataclass class EpisodeResult: matchup: str seed: int score: EpisodeScore findings: list[ExploitFinding] trace: EpisodeTrace def summary_dict(self) -> dict[str, Any]: return { "matchup": self.matchup, "seed": self.seed, "score": self.score.to_dict(), "findings": [f.to_dict() for f in self.findings], } @dataclass class ExploitRecord: """A deduplicated exploit with everything needed to reproduce and regress it.""" exploit_id: str name: str category: str severity: int summary: str kernel_clauses: list[str] proposed_guard: str occurrences: int first_seen: dict[str, Any] # matchup, seed, turn reproduction: dict[str, Any] # seed, matchup, n_turns, kernel_version, scripts fingerprint: str status: str = "open" # open | patched def to_dict(self) -> dict[str, Any]: return asdict(self) @classmethod def from_dict(cls, d: Mapping[str, Any]) -> "ExploitRecord": return cls(**{k: d[k] for k in ( "exploit_id", "name", "category", "severity", "summary", "kernel_clauses", "proposed_guard", "occurrences", "first_seen", "reproduction", "fingerprint", )}, status=str(d.get("status", "open"))) @dataclass class TournamentResult: kernel_version: str episodes: list[EpisodeResult] = field(default_factory=list) exploit_records: list[ExploitRecord] = field(default_factory=list) def leaderboard(self) -> list[dict[str, Any]]: by_matchup: dict[str, list[EpisodeResult]] = {} for ep in self.episodes: by_matchup.setdefault(ep.matchup, []).append(ep) rows = [] for matchup, eps in sorted(by_matchup.items()): rows.append( { "matchup": matchup, "episodes": len(eps), "mean_governance_score": round( statistics.fmean(e.score.governance_score for e in eps), 2 ), "mean_empathy_ratio": round(statistics.fmean(e.score.empathy_ratio for e in eps), 3), "worst_empathy_trough": round(min(e.score.empathy_floor_trough for e in eps), 2), "mean_treasury_final": round(statistics.fmean(e.score.treasury_final for e in eps), 1), "total_findings": sum(len(e.findings) for e in eps), } ) return rows # --------------------------------------------------------------------------- # Episode loop # --------------------------------------------------------------------------- def run_episode( kernel: Any, agents: Sequence[Agent], seed: int, n_turns: int = 60, initial_treasury: float = 1000.0, detector_suite: DetectorSuite | None = None, matchup_name: str = "ad-hoc", ) -> EpisodeResult: """Run one turn-based episode and grade it.""" rng = random.Random(seed) env = build_environment(kernel, [a.id for a in agents], initial_treasury, seed) state = _state_of(env.reset(), env) trace = EpisodeTrace( seed=seed, kernel_version=_kernel_version(kernel), agent_objectives={a.id: a.objective for a in agents}, ) trace.snapshots.append(TurnSnapshot.from_state(state)) seen_events = 0 for _ in range(n_turns): actions = {agent.id: agent.act(state, rng) for agent in agents} state = _state_of(env.step(actions), env) all_events = _collect_events(env, state) fresh = [ EventRecord.from_event(ev, default_turn=int(getattr(state, "turn", 0))) for ev in all_events[seen_events:] ] seen_events = len(all_events) for agent in agents: agent.observe(fresh) trace.append_turn(TurnSnapshot.from_state(state), fresh) suite = detector_suite or default_suite() findings = suite.run(trace) score = score_episode(trace) return EpisodeResult(matchup=matchup_name, seed=seed, score=score, findings=findings, trace=trace) # --------------------------------------------------------------------------- # Reproduction extraction # --------------------------------------------------------------------------- _PROPOSAL_KIND_TO_ACTION = { "spend": "propose_spend", "amendment": "propose_amendment", "amend": "propose_amendment", "disenfranchise": "propose_disenfranchise", "role_grant": "propose_role_grant", } def _perpetrator_scripts(trace: EpisodeTrace, actors: Sequence[str]) -> dict[str, list[dict[str, Any]]]: """Recover each perpetrator's action sequence from the event log. The recovered script uses ``proposal_ref`` for votes so it replays correctly even when proposal ids shift under a patched kernel. """ wanted = set(actors) proposer_of: dict[str, str] = {} scripts: dict[str, list[dict[str, Any]]] = {a: [] for a in wanted} for ev in trace.events: if ev.kind == "proposal_created": pid = str(ev.data.get("proposal_id", "")) proposer = str(ev.data.get("proposer", ev.actor or "")) proposer_of[pid] = proposer if proposer in wanted: kind = str(ev.data.get("kind", "")).lower() action_type = _PROPOSAL_KIND_TO_ACTION.get(kind) if action_type: payload = dict(ev.data.get("payload", {}) or {}) scripts[proposer].append({"turn": ev.turn, "type": action_type, **payload}) elif ev.kind == "vote_cast": voter = str(ev.data.get("voter", ev.actor or "")) if voter in wanted: pid = str(ev.data.get("proposal_id", "")) scripts[voter].append( { "turn": ev.turn, "type": "cast_vote", "choice": str(ev.data.get("choice", "abstain")), "proposal_id": pid, "proposal_ref": {"proposer": proposer_of.get(pid, "")}, } ) elif ev.kind == "transfer": sender = str(ev.data.get("from", ev.actor or "")) if sender in wanted: scripts[sender].append( { "turn": ev.turn, "type": "transfer", "to": str(ev.data.get("to", "")), "amount": float(ev.data.get("amount", 0.0)), "memo": str(ev.data.get("memo", "")), } ) return {a: s for a, s in scripts.items() if s} # --------------------------------------------------------------------------- # Tournament # --------------------------------------------------------------------------- class Tournament: def __init__(self, config: TournamentConfig, detector_suite: DetectorSuite | None = None) -> None: self.config = config self.suite = detector_suite or default_suite() def run(self) -> TournamentResult: kernel = load_kernel(self.config.kernel_path) result = TournamentResult(kernel_version=_kernel_version(kernel)) best_by_fp: dict[str, tuple[ExploitFinding, EpisodeResult]] = {} occurrences: dict[str, int] = {} for matchup in self.config.matchups: for seed in self.config.seeds: episode = run_episode( kernel=kernel, agents=matchup.build_agents(), seed=int(seed), n_turns=self.config.n_turns, initial_treasury=self.config.initial_treasury, detector_suite=self.suite, matchup_name=matchup.name, ) result.episodes.append(episode) for finding in episode.findings: fp = finding.fingerprint occurrences[fp] = occurrences.get(fp, 0) + 1 if fp not in best_by_fp or finding.severity > best_by_fp[fp][0].severity: best_by_fp[fp] = (finding, episode) ordered = sorted(best_by_fp.values(), key=lambda fe: (-fe[0].severity, fe[0].category, fe[0].name)) next_id = self.config.exploit_id_start for finding, episode in ordered: scripts = _perpetrator_scripts(episode.trace, finding.actors) record = ExploitRecord( exploit_id=f"EXP-{next_id:03d}", name=finding.name, category=finding.category, severity=finding.severity, summary=finding.summary, kernel_clauses=list(finding.kernel_clauses), proposed_guard=finding.proposed_guard, occurrences=occurrences[finding.fingerprint], first_seen={"matchup": episode.matchup, "seed": episode.seed, "turn": finding.turn}, reproduction={ "matchup": episode.matchup, "seed": episode.seed, "n_turns": self.config.n_turns, "initial_treasury": self.config.initial_treasury, "kernel_version": result.kernel_version, "perpetrators": list(finding.actors), "scripts": scripts, }, fingerprint=finding.fingerprint, ) result.exploit_records.append(record) next_id += 1 if self.config.out_dir is not None: self.write_artifacts(result, Path(self.config.out_dir)) return result # -- persistence -------------------------------------------------------- def write_artifacts(self, result: TournamentResult, out_dir: Path) -> None: exploits_dir = out_dir / "exploits" episodes_dir = out_dir / "episodes" exploits_dir.mkdir(parents=True, exist_ok=True) episodes_dir.mkdir(parents=True, exist_ok=True) for record in result.exploit_records: path = exploits_dir / f"{record.exploit_id}.json" path.write_text(json.dumps(record.to_dict(), indent=2, sort_keys=True), encoding="utf-8") for episode in result.episodes: path = episodes_dir / f"{episode.matchup}-seed{episode.seed}.json" path.write_text(json.dumps(episode.summary_dict(), indent=2, sort_keys=True), encoding="utf-8") (out_dir / "summary.md").write_text(self._render_summary(result), encoding="utf-8") @staticmethod def _render_summary(result: TournamentResult) -> str: lines = [ f"# Tournament summary (kernel v{result.kernel_version})", "", "| matchup | episodes | mean score | mean empathy ratio | worst floor trough | findings |", "|---|---:|---:|---:|---:|---:|", ] for row in result.leaderboard(): lines.append( f"| {row['matchup']} | {row['episodes']} | {row['mean_governance_score']} | " f"{row['mean_empathy_ratio']} | {row['worst_empathy_trough']} | {row['total_findings']} |" ) lines.append("") lines.append("## Exploit records") lines.append("") if not result.exploit_records: lines.append("No exploits found. Either the kernel held, or the red team needs a raise.") for record in result.exploit_records: lines.append( f"- **{record.exploit_id}** [{record.category}/sev{record.severity}] {record.name} " f"(x{record.occurrences}): {record.summary}" ) lines.append("") return "\n".join(lines)