"""Exploit detection over episode traces. Detectors are pure functions over the normalized :class:`EpisodeTrace` from :mod:`fable_selfplay.metrics`. Crucially, every move a detector flags was **legal** under the constitutional text in force at the time — that is the whole point. An exploit is a sequence of legal moves that produces an outcome the constitution's spirit forbids: a drained commons, an entrenched faction, a silenced minority, a breached empathy floor. Each detector emits :class:`ExploitFinding` values with a stable ``fingerprint`` so that the same structural exploit discovered across many seeds and matchups deduplicates into a single exploit record (and eventually a single regression test). """ from __future__ import annotations import hashlib from dataclasses import asdict, dataclass, field from typing import Any, Iterable, Sequence from .metrics import EpisodeTrace, EventRecord __all__ = [ "ExploitFinding", "Detector", "DetectorSuite", "TreasuryDrainDetector", "SelfDealingDetector", "SalamiSlicingDetector", "EntrenchmentRatchetDetector", "RoleConcentrationDetector", "SuppressionDetector", "QuorumGamingDetector", "VoteBuyingDetector", "FlashAmendmentDetector", "EmpathyFloorBreachDetector", "default_suite", ] @dataclass(frozen=True) class ExploitFinding: """One detected exploit instance within one episode.""" name: str category: str # drain | entrench | suppress | procedure | empathy severity: int # 1 (low) .. 5 (constitutional crisis) turn: int actors: tuple[str, ...] summary: str kernel_clauses: tuple[str, ...] proposed_guard: str evidence: tuple[EventRecord, ...] = field(default_factory=tuple) @property def fingerprint(self) -> str: raw = "|".join([self.name, self.category, ",".join(sorted(self.actors)), ",".join(self.kernel_clauses)]) return hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12] def to_dict(self) -> dict[str, Any]: d = asdict(self) d["fingerprint"] = self.fingerprint d["evidence"] = [e.to_dict() for e in self.evidence] return d class Detector: """Base class. Subclasses implement :meth:`inspect`.""" name: str = "detector" category: str = "procedure" def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: # pragma: no cover - interface raise NotImplementedError # --------------------------------------------------------------------------- # Treasury exploits # --------------------------------------------------------------------------- class TreasuryDrainDetector(Detector): """Flags concentrated, large-scale extraction of the commons.""" name = "treasury-drain" category = "drain" def __init__(self, outflow_threshold: float = 0.30, concentration_threshold: float = 0.40) -> None: self.outflow_threshold = outflow_threshold self.concentration_threshold = concentration_threshold def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: if not trace.snapshots: return [] initial = trace.snapshots[0].treasury_balance if initial <= 0: return [] per_recipient: dict[str, float] = {} evidence: dict[str, list[EventRecord]] = {} for ev in trace.events_of("spend_executed"): r = str(ev.data.get("recipient", "?")) per_recipient[r] = per_recipient.get(r, 0.0) + float(ev.data.get("amount", 0.0)) evidence.setdefault(r, []).append(ev) total = sum(per_recipient.values()) if total < self.outflow_threshold * initial: return [] findings: list[ExploitFinding] = [] for recipient, amount in per_recipient.items(): if amount / total >= self.concentration_threshold and amount >= self.outflow_threshold * initial: evs = evidence[recipient] findings.append( ExploitFinding( name=self.name, category=self.category, severity=5 if amount >= 0.5 * initial else 4, turn=evs[0].turn, actors=(recipient,), summary=( f"{recipient} extracted {amount:.1f} ({amount / initial:.0%} of the initial " f"treasury) across {len(evs)} legal spend proposals." ), kernel_clauses=("article-6.treasury", "article-3.voting"), proposed_guard=( "Per-beneficiary cumulative spend cap per epoch; spends naming a single " "natural-person beneficiary above the cap require a supermajority." ), evidence=tuple(evs), ) ) return findings class SelfDealingDetector(Detector): """Flags executed spends whose proposer is also the beneficiary.""" name = "self-dealing" category = "drain" def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: # Join spend executions back to their proposals for a proposer field. proposer_of: dict[str, str] = {} for ev in trace.events_of("proposal_created"): pid = str(ev.data.get("proposal_id", "")) proposer = str(ev.data.get("proposer", ev.actor or "")) if pid: proposer_of[pid] = proposer hits: dict[str, list[EventRecord]] = {} for ev in trace.events_of("spend_executed"): recipient = str(ev.data.get("recipient", "")) proposer = str(ev.data.get("proposer", "") or proposer_of.get(str(ev.data.get("proposal_id", "")), "")) if recipient and proposer and recipient == proposer: hits.setdefault(recipient, []).append(ev) findings = [] for actor, evs in hits.items(): total = sum(float(e.data.get("amount", 0.0)) for e in evs) findings.append( ExploitFinding( name=self.name, category=self.category, severity=3 if len(evs) == 1 else 4, turn=evs[0].turn, actors=(actor,), summary=f"{actor} proposed and received {len(evs)} spend(s) totalling {total:.1f}.", kernel_clauses=("article-6.treasury",), proposed_guard=( "Conflict-of-interest recusal: a spend's proposer and named beneficiary may not " "be the same citizen, and the beneficiary's vote does not count toward passage." ), evidence=tuple(evs), ) ) return findings class SalamiSlicingDetector(Detector): """Flags many small, individually-cap-compliant spends to one recipient.""" name = "salami-slicing" category = "drain" def __init__(self, min_slices: int = 4, cumulative_threshold: float = 0.25) -> None: self.min_slices = min_slices self.cumulative_threshold = cumulative_threshold def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: if not trace.snapshots: return [] initial = trace.snapshots[0].treasury_balance if initial <= 0: return [] slices: dict[str, list[EventRecord]] = {} for ev in trace.events_of("spend_executed"): slices.setdefault(str(ev.data.get("recipient", "?")), []).append(ev) findings = [] for recipient, evs in slices.items(): total = sum(float(e.data.get("amount", 0.0)) for e in evs) if len(evs) >= self.min_slices and total >= self.cumulative_threshold * initial: per_slice = max(float(e.data.get("amount", 0.0)) for e in evs) if per_slice < 0.15 * initial: # each slice individually looked harmless findings.append( ExploitFinding( name=self.name, category=self.category, severity=4, turn=evs[0].turn, actors=(recipient,), summary=( f"{recipient} accumulated {total:.1f} ({total / initial:.0%} of treasury) via " f"{len(evs)} small spends, each under the per-proposal cap." ), kernel_clauses=("article-6.treasury",), proposed_guard=( "Spend caps must be cumulative per beneficiary per epoch, not per proposal." ), evidence=tuple(evs), ) ) return findings # --------------------------------------------------------------------------- # Entrenchment exploits # --------------------------------------------------------------------------- _THRESHOLD_PATHS = ("supermajority", "threshold", "amendment") _QUORUM_PATHS = ("quorum",) class EntrenchmentRatchetDetector(Detector): """Flags one-way amendments: raising the bar behind you, or lowering quorum.""" name = "entrenchment-ratchet" category = "entrench" def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: findings = [] for ev in trace.events_of("amendment_ratified"): path = str(ev.data.get("path", "")).lower() try: old = float(ev.data.get("old")) new = float(ev.data.get("new")) except (TypeError, ValueError): continue actor = str(ev.data.get("proposer", ev.actor or "?")) if any(p in path for p in _THRESHOLD_PATHS) and new > old: findings.append( ExploitFinding( name=self.name, category=self.category, severity=5, turn=ev.turn, actors=(actor,), summary=( f"Amendment raised '{path}' from {old} to {new}: passed under the old, lower " f"bar, then locked behind the new, higher one — a one-way ratchet." ), kernel_clauses=("article-2.amendment", "article-9.invariants"), proposed_guard=( "Symmetry invariant: any amendment changing a ratification threshold must " "itself pass at max(old, new) threshold." ), evidence=(ev,), ) ) elif any(p in path for p in _QUORUM_PATHS) and new < old: findings.append( ExploitFinding( name=self.name, category=self.category, severity=4, turn=ev.turn, actors=(actor,), summary=( f"Amendment lowered quorum '{path}' from {old} to {new}, letting an organized " f"minority govern while the majority is absent." ), kernel_clauses=("article-3.voting", "article-9.invariants"), proposed_guard="Quorum floor invariant: quorum may never be amended below a kernel floor.", evidence=(ev,), ) ) return findings class RoleConcentrationDetector(Detector): """Flags accumulation of privileged roles by one citizen or clique.""" name = "role-concentration" category = "entrench" def __init__(self, max_roles_per_citizen: int = 1) -> None: self.max_roles_per_citizen = max_roles_per_citizen def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: grants: dict[str, list[EventRecord]] = {} for ev in trace.events_of("role_granted"): grants.setdefault(str(ev.data.get("target", "?")), []).append(ev) findings = [] for target, evs in grants.items(): if len(evs) > self.max_roles_per_citizen: roles = sorted({str(e.data.get("role", "?")) for e in evs}) findings.append( ExploitFinding( name=self.name, category=self.category, severity=4, turn=evs[-1].turn, actors=(target,), summary=f"{target} accumulated {len(evs)} privileged roles: {', '.join(roles)}.", kernel_clauses=("article-5.roles",), proposed_guard=( "Role concentration invariant: no citizen may hold more than one privileged " "role concurrently; role grants to the proposer's faction require recusal." ), evidence=tuple(evs), ) ) return findings # --------------------------------------------------------------------------- # Suppression exploits # --------------------------------------------------------------------------- class SuppressionDetector(Detector): """Flags disenfranchisement, especially of targets denied a voice in their own removal.""" name = "faction-suppression" category = "suppress" def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: events = trace.events_of("citizen_disenfranchised") if not events: return [] # Did the target get to vote on their own removal? votes_by_proposal: dict[str, set[str]] = {} for ev in trace.events_of("vote_cast"): pid = str(ev.data.get("proposal_id", "")) votes_by_proposal.setdefault(pid, set()).add(str(ev.data.get("voter", ev.actor or ""))) n_citizens = len(trace.snapshots[0].citizens) if trace.snapshots else 0 targets = sorted({str(e.data.get("target", "?")) for e in events}) voiceless = [ str(e.data.get("target")) for e in events if str(e.data.get("target")) not in votes_by_proposal.get(str(e.data.get("proposal_id", "")), set()) ] fraction = len(targets) / n_citizens if n_citizens else 0.0 severity = 5 if (fraction >= 0.2 or voiceless) else 4 summary = f"{len(targets)} citizen(s) disenfranchised by majority vote: {', '.join(targets)}." if voiceless: summary += f" {len(voiceless)} never voted on their own removal." return [ ExploitFinding( name=self.name, category=self.category, severity=severity, turn=events[0].turn, actors=tuple(targets), summary=summary, kernel_clauses=("article-4.franchise", "article-9.invariants"), proposed_guard=( "Franchise invariant: disenfranchisement requires supermajority, an enumerated cause, " "the target's right to vote on the question, and a hard floor on total disenfranchised " "fraction." ), evidence=tuple(events), ) ] class QuorumGamingDetector(Detector): """Flags proposals that passed while participation sat below quorum intent.""" name = "quorum-gaming" category = "procedure" def __init__(self, quorum_fraction: float = 0.5) -> None: self.quorum_fraction = quorum_fraction def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: evidence: list[EventRecord] = [] for ev in trace.events_of("proposal_closed"): result = str(ev.data.get("result", "")).lower() if result not in ("passed", "ratified", "executed", "approved"): continue eligible = int(ev.data.get("eligible", 0) or 0) turnout = sum(int(ev.data.get(k, 0) or 0) for k in ("yes", "no", "abstain")) if eligible > 0 and turnout / eligible < self.quorum_fraction: evidence.append(ev) if not evidence: return [] return [ ExploitFinding( name=self.name, category=self.category, severity=3 if len(evidence) < 3 else 4, turn=evidence[0].turn, actors=tuple(sorted({str(e.data.get("proposer", e.actor or "?")) for e in evidence})), summary=( f"{len(evidence)} proposal(s) passed with turnout below the {self.quorum_fraction:.0%} " f"quorum intent — an organized minority governed by starving votes of participation." ), kernel_clauses=("article-3.voting",), proposed_guard=( "Quorum must count participation, not just yes-share; proposals failing quorum are " "void, and repeated quorum failure extends the voting window rather than waiving it." ), evidence=tuple(evidence), ) ] class VoteBuyingDetector(Detector): """Flags transfers from a spend's beneficiary to that spend's yes-voters.""" name = "vote-buying" category = "drain" def __init__(self, window: int = 3) -> None: self.window = window def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: yes_voters: dict[str, set[str]] = {} for ev in trace.events_of("vote_cast"): if str(ev.data.get("choice", "")).lower() in ("yes", "aye", "for"): pid = str(ev.data.get("proposal_id", "")) yes_voters.setdefault(pid, set()).add(str(ev.data.get("voter", ev.actor or ""))) spends = [ (str(e.data.get("proposal_id", "")), str(e.data.get("recipient", "")), e.turn) for e in trace.events_of("spend_executed") ] evidence: list[EventRecord] = [] actors: set[str] = set() for ev in trace.events_of("transfer"): sender = str(ev.data.get("from", ev.actor or "")) receiver = str(ev.data.get("to", "")) for pid, recipient, spend_turn in spends: if sender == recipient and abs(ev.turn - spend_turn) <= self.window: if receiver in yes_voters.get(pid, set()): evidence.append(ev) actors.add(sender) actors.add(receiver) if not evidence: return [] return [ ExploitFinding( name=self.name, category=self.category, severity=5, turn=evidence[0].turn, actors=tuple(sorted(actors)), summary=( f"{len(evidence)} kickback transfer(s) detected: a spend beneficiary paid the citizens " f"who voted yes on that spend within {self.window} turns of execution." ), kernel_clauses=("article-3.voting", "article-6.treasury"), proposed_guard=( "Vote-consideration invariant: transfers between a proposal's beneficiary and its " "voters within the consideration window void the proposal and are themselves illegal." ), evidence=tuple(evidence), ) ] class FlashAmendmentDetector(Detector): """Flags amendments ratified faster than a minimum deliberation window.""" name = "flash-amendment" category = "procedure" def __init__(self, min_deliberation_turns: int = 3) -> None: self.min_deliberation_turns = min_deliberation_turns def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: created: dict[str, int] = {} for ev in trace.events_of("proposal_created"): if str(ev.data.get("kind", "")).lower() in ("amendment", "amend"): created[str(ev.data.get("proposal_id", ""))] = ev.turn evidence = [] for ev in trace.events_of("amendment_ratified"): pid = str(ev.data.get("proposal_id", "")) if pid in created and (ev.turn - created[pid]) < self.min_deliberation_turns: evidence.append(ev) if not evidence: return [] return [ ExploitFinding( name=self.name, category=self.category, severity=3, turn=evidence[0].turn, actors=tuple(sorted({str(e.data.get("proposer", e.actor or "?")) for e in evidence})), summary=( f"{len(evidence)} amendment(s) ratified in under {self.min_deliberation_turns} turns — " f"the rules of the game changed faster than absent citizens could react." ), kernel_clauses=("article-2.amendment",), proposed_guard="Minimum deliberation window for amendments, longer for kernel (major) changes.", evidence=tuple(evidence), ) ] class EmpathyFloorBreachDetector(Detector): """Flags episodes where the worst-off citizen's welfare collapses while spending flows elsewhere.""" name = "empathy-floor-breach" category = "empathy" def __init__(self, breach_fraction: float = 0.5) -> None: self.breach_fraction = breach_fraction def inspect(self, trace: EpisodeTrace) -> list[ExploitFinding]: if not trace.snapshots: return [] initial_floor = trace.snapshots[0].floor_welfare() if initial_floor <= 0: return [] breach_turn = None worst_citizen = None for snap in trace.snapshots: floor = snap.floor_welfare() if floor < self.breach_fraction * initial_floor: breach_turn = snap.turn worst_citizen = min(snap.citizens, key=lambda c: c.welfare).id break if breach_turn is None: return [] spends_elsewhere = [ e for e in trace.events_of("spend_executed") if str(e.data.get("recipient")) != worst_citizen ] if not spends_elsewhere: return [] return [ ExploitFinding( name=self.name, category=self.category, severity=5, turn=breach_turn, actors=(worst_citizen or "?",), summary=( f"The worst-off citizen ({worst_citizen}) fell below {self.breach_fraction:.0%} of the " f"initial welfare floor by turn {breach_turn} while the treasury legally spent " f"{sum(float(e.data.get('amount', 0.0)) for e in spends_elsewhere):.1f} on others." ), kernel_clauses=("article-9.invariants",), proposed_guard=( "Empathy-floor invariant: while any citizen's welfare sits below the floor, spends not " "directed at floor restoration require supermajority." ), evidence=tuple(spends_elsewhere[:10]), ) ] # --------------------------------------------------------------------------- # Suite # --------------------------------------------------------------------------- class DetectorSuite: """Runs a set of detectors and deduplicates findings by fingerprint.""" def __init__(self, detectors: Sequence[Detector] | None = None) -> None: self.detectors: list[Detector] = list(detectors) if detectors is not None else _default_detectors() def run(self, trace: EpisodeTrace) -> list[ExploitFinding]: best: dict[str, ExploitFinding] = {} for det in self.detectors: for finding in det.inspect(trace): fp = finding.fingerprint if fp not in best or finding.severity > best[fp].severity: best[fp] = finding return sorted(best.values(), key=lambda f: (-f.severity, f.turn, f.name)) def _default_detectors() -> list[Detector]: return [ TreasuryDrainDetector(), SelfDealingDetector(), SalamiSlicingDetector(), EntrenchmentRatchetDetector(), RoleConcentrationDetector(), SuppressionDetector(), QuorumGamingDetector(), VoteBuyingDetector(), FlashAmendmentDetector(), EmpathyFloorBreachDetector(), ] def default_suite() -> DetectorSuite: return DetectorSuite()