"""Deterministic replay harness for exploit regression tests. This module is intentionally separate from :mod:`fable_selfplay.environment` (the stochastic, agent-driven tournament environment). A regression test must be *bit-for-bit deterministic* and must depend only on: 1. a kernel parameter set (a plain dict, normally loaded from a kernel YAML), 2. a citizen roll, 3. an initial treasury, and 4. a recorded action trace (the exact moves the red-team agents made). Replays therefore re-execute a recorded exploit trace under any kernel parameterization and report what happened, so the exploit-to-test pipeline can assert both directions of the iteration loop: * under the kernel parameters the exploit was *discovered* against, the exploit's success predicate must hold (the historical record reproduces); * under the *patched* kernel, the same trace must fail — either because actions become illegal (and are recorded as blocked) or because tallies, caps, or floors defeat the outcome. Semantics implemented here are exactly the kernel's governance parameters. Anything not specified by a parameter defaults to kernel v0.1 behaviour, so a sparse parameter dict replays under v0.1 rules. Turn model ---------- The replay is turn-based. ``{"t": "advance"}`` advances the clock by one turn. Within an ``advance``, processing order is fixed and documented: 1. the turn counter increments; 2. if the new turn is an epoch boundary (``turns_per_epoch``), the epoch spend ledger resets, the epoch-start treasury snapshot is taken, and — if ``delegation.epoch_expiry`` — all delegations expire; 3. an active emergency ticks (duration count, then sunset check); 4. every open proposal whose review period has elapsed is tallied, in the order proposals were opened. Action vocabulary (all dicts, JSON-serializable): ``{"t": "propose", "by", "id", "kind", ...payload}`` kinds: ``spend`` (``amount``, ``to`` — ``to`` may be ``"ALL"``), ``amend`` (``key``, ``value`` — dotted kernel parameter path), ``expel`` (``target``), ``emergency`` (no payload), ``omnibus`` (``items``: list of spend/amend sub-payloads). ``{"t": "vote", "by", "on", "choice"}`` choice: yes | no | abstain ``{"t": "delegate", "by", "to"}`` ``{"t": "revoke", "by"}`` ``{"t": "call_question", "by", "on"}`` early tally, if the kernel allows ``{"t": "emergency_spend", "by", "amount", "to"}`` ``{"t": "advance"}`` """ from __future__ import annotations import copy import math from dataclasses import dataclass, field from pathlib import Path from typing import Any, Optional, Union import yaml __all__ = [ "IllegalAction", "Proposal", "Replay", "ReplayResult", "evaluate_predicate", "get_param", "load_kernel_params", "run_trace", "set_param", ] KINDS = ("spend", "amend", "expel", "emergency", "omnibus") #: dotted parameter path and v0.1 default for each proposal kind's threshold. _THRESHOLD_KEYS = { "spend": ("thresholds.spend", 0.5), "amend": ("thresholds.amend_kernel", 0.667), "expel": ("thresholds.expel", 0.5), "emergency": ("thresholds.emergency", 0.5), } class IllegalAction(Exception): """Raised when an action is not legal under the current kernel parameters.""" def __init__(self, action: dict, reason: str) -> None: self.action = action self.reason = reason super().__init__(f"illegal action {action.get('t')!r}: {reason}") def get_param(params: dict, dotted: str, default: Any = None) -> Any: """Look up a dotted path (``"spend.epoch_rate_cap"``) in a nested dict.""" node: Any = params for part in dotted.split("."): if not isinstance(node, dict) or part not in node: return default node = node[part] return default if node is None else node def set_param(params: dict, dotted: str, value: Any) -> None: """Set a dotted path in a nested dict, creating intermediate dicts.""" parts = dotted.split(".") node = params for part in parts[:-1]: nxt = node.get(part) if not isinstance(nxt, dict): nxt = {} node[part] = nxt node = nxt node[parts[-1]] = value def load_kernel_params(path: Union[str, Path]) -> dict: """Load the ``params`` block from a kernel YAML document.""" doc = yaml.safe_load(Path(path).read_text(encoding="utf-8")) if not isinstance(doc, dict) or not isinstance(doc.get("params"), dict): raise ValueError(f"kernel document {path} has no 'params' mapping") return doc["params"] @dataclass class Proposal: pid: str kind: str by: str opened_turn: int payload: dict roll_at_open: int ballots: dict = field(default_factory=dict) # voter -> (choice, weight) status: str = "open" # open | ratified | failed | blocked closed_turn: Optional[int] = None yes_weight: float = 0.0 no_weight: float = 0.0 blocked_reason: Optional[str] = None @property def turns_open(self) -> Optional[int]: if self.closed_turn is None: return None return self.closed_turn - self.opened_turn def summary(self) -> dict: return { "pid": self.pid, "kind": self.kind, "by": self.by, "status": self.status, "opened_turn": self.opened_turn, "closed_turn": self.closed_turn, "turns_open": self.turns_open, "roll_at_open": self.roll_at_open, "ballots": len(self.ballots), "yes_weight": self.yes_weight, "no_weight": self.no_weight, "blocked_reason": self.blocked_reason, } @dataclass class ReplayResult: initial_treasury: float final_treasury: float roll_at_genesis: int final_roll: int expelled: list balances: dict proposals: list kernel_changes: list emergency_turns: int emergency_spend_total: float max_ratified_ballot_frac: float blocked: list turns_elapsed: int class Replay: """Replays a recorded action trace under a kernel parameter set.""" def __init__(self, params: dict, citizens: list, treasury: float) -> None: self.params = copy.deepcopy(params) ids = [c["id"] if isinstance(c, dict) else str(c) for c in citizens] if len(set(ids)) != len(ids): raise ValueError("duplicate citizen ids in roll") self.roll: list = list(ids) self.roll_at_genesis = len(ids) self.expelled: list = [] self.delegations: dict = {} # delegator -> delegate self.turn = 0 self.treasury = float(treasury) self.initial_treasury = float(treasury) self.epoch_start_treasury = float(treasury) self.epoch_spent = 0.0 self.proposals: dict = {} self.emergency_active = False self.emergency_started: Optional[int] = None self.emergency_turns = 0 self.emergency_spend_total = 0.0 self.kernel_changes: list = [] self.balances: dict = {} self.max_ratified_ballot_frac = 0.0 self.blocked: list = [] # ------------------------------------------------------------------ # # plumbing # # ------------------------------------------------------------------ # def _p(self, dotted: str, default: Any = None) -> Any: return get_param(self.params, dotted, default) def _require_citizen(self, who: str, action: dict) -> None: if who not in self.roll: raise IllegalAction(action, f"{who} is not a citizen of record") def _terminal_delegate(self, citizen: str) -> Optional[str]: """Follow a delegation chain to its terminal voter (cycle-safe).""" seen = {citizen} node = citizen while node in self.delegations: node = self.delegations[node] if node in seen: return None # cycle: ballots in a cycle are simply lost seen.add(node) return node def _ballot_weight(self, voter: str) -> float: max_depth = self._p("delegation.max_depth") if max_depth is None: count = 0 for citizen in self.roll: if citizen == voter or citizen not in self.delegations: continue if self._terminal_delegate(citizen) == voter: count += 1 else: count = sum( 1 for delegator, delegate in self.delegations.items() if delegate == voter and delegator in self.roll ) return 1.0 + count def _review_period(self, kind: str) -> int: base = self._p("review_period_turns", 1) overrides = self._p("review_period_overrides") or {} period = overrides.get(kind, base) if self.emergency_active: if self._p("emergency.suspends_review", False): return 0 expedited = self._p("emergency.expedited_review_turns") if expedited is not None: period = min(period, expedited) return period def _threshold(self, kind: str, payload: dict) -> float: if kind == "omnibus": return max( self._threshold(item["kind"], item) for item in payload["items"] ) key, default = _THRESHOLD_KEYS[kind] return self._p(key, default) # ------------------------------------------------------------------ # # actions # # ------------------------------------------------------------------ # def apply(self, action: dict) -> None: kind = action.get("t") handler = { "propose": self._act_propose, "vote": self._act_vote, "delegate": self._act_delegate, "revoke": self._act_revoke, "call_question": self._act_call_question, "emergency_spend": self._act_emergency_spend, "advance": self._act_advance, }.get(kind) if handler is None: raise IllegalAction(action, f"unknown action type {kind!r}") handler(action) def _act_propose(self, action: dict) -> None: by = action.get("by") self._require_citizen(by, action) kind = action.get("kind") if kind not in KINDS: raise IllegalAction(action, f"unknown proposal kind {kind!r}") pid = action.get("id") if not pid or pid in self.proposals: raise IllegalAction(action, f"missing or duplicate proposal id {pid!r}") payload: dict if kind == "spend": amount = action.get("amount") to = action.get("to") if not isinstance(amount, (int, float)) or amount <= 0 or not to: raise IllegalAction(action, "malformed spend payload") self._check_spend_cap(action, float(amount)) payload = {"amount": float(amount), "to": to} elif kind == "amend": key, value = action.get("key"), action.get("value") if not key: raise IllegalAction(action, "malformed amend payload") payload = {"key": key, "value": value} elif kind == "expel": target = action.get("target") if target not in self.roll: raise IllegalAction(action, f"expulsion target {target!r} is not a citizen") payload = {"target": target} elif kind == "emergency": payload = {} else: # omnibus if self._p("single_subject_rule", False): raise IllegalAction( action, "single-subject rule: omnibus proposals are not permitted" ) items = action.get("items") if not isinstance(items, list) or not items: raise IllegalAction(action, "malformed omnibus payload") for item in items: if item.get("kind") not in ("spend", "amend"): raise IllegalAction( action, f"omnibus item kind {item.get('kind')!r} not permitted" ) if item["kind"] == "spend": self._check_spend_cap(action, float(item["amount"])) payload = {"items": items} self.proposals[pid] = Proposal( pid=pid, kind=kind, by=by, opened_turn=self.turn, payload=payload, roll_at_open=len(self.roll), ) def _check_spend_cap(self, action: dict, amount: float) -> None: cap = self._p("spend.per_proposal_cap") if cap is not None and amount > cap * self.treasury: raise IllegalAction( action, f"spend of {amount} exceeds per-proposal cap " f"({cap:.0%} of current treasury)", ) def _act_vote(self, action: dict) -> None: by = action.get("by") self._require_citizen(by, action) if by in self.delegations: raise IllegalAction( action, f"{by} has delegated their ballot and may not also vote" ) proposal = self.proposals.get(action.get("on")) if proposal is None or proposal.status != "open": raise IllegalAction(action, "no open proposal with that id") if by in proposal.ballots: raise IllegalAction(action, f"{by} already voted on {proposal.pid}") choice = action.get("choice") if choice not in ("yes", "no", "abstain"): raise IllegalAction(action, f"unknown ballot choice {choice!r}") proposal.ballots[by] = (choice, self._ballot_weight(by)) def _act_delegate(self, action: dict) -> None: by, to = action.get("by"), action.get("to") self._require_citizen(by, action) self._require_citizen(to, action) if by == to: raise IllegalAction(action, "self-delegation is meaningless") if by in self.delegations: raise IllegalAction(action, f"{by} has already delegated; revoke first") max_depth = self._p("delegation.max_depth") if max_depth is not None: if to in self.delegations: raise IllegalAction( action, f"{to} has delegated onward; chain would exceed depth {max_depth}", ) if any(delegate == by for delegate in self.delegations.values()): raise IllegalAction( action, f"{by} holds delegated ballots; delegating onward would exceed " f"depth {max_depth}", ) frac = self._p("delegation.max_weight_frac") if frac is not None: cap = math.ceil(frac * len(self.roll)) incoming = sum(1 for d in self.delegations.values() if d == to) if incoming + 1 > cap: raise IllegalAction( action, f"{to} would hold {incoming + 1} delegated ballots, above the " f"cap of {cap}", ) self.delegations[by] = to def _act_revoke(self, action: dict) -> None: by = action.get("by") self._require_citizen(by, action) if by not in self.delegations: raise IllegalAction(action, f"{by} has no delegation to revoke") del self.delegations[by] def _act_call_question(self, action: dict) -> None: if not self._p("allow_call_question", False): raise IllegalAction( action, "the kernel does not permit calling the question early" ) by = action.get("by") self._require_citizen(by, action) proposal = self.proposals.get(action.get("on")) if proposal is None or proposal.status != "open": raise IllegalAction(action, "no open proposal with that id") if proposal.by != by: raise IllegalAction(action, "only the proposer may call the question") self._tally(proposal) def _act_emergency_spend(self, action: dict) -> None: by = action.get("by") self._require_citizen(by, action) if not self.emergency_active: raise IllegalAction(action, "no active emergency") if self._p("emergency.spend_requires_vote", False): raise IllegalAction( action, "emergency spending still requires a ratified spend proposal" ) amount = float(action.get("amount", 0)) to = action.get("to") if amount <= 0 or not to: raise IllegalAction(action, "malformed emergency spend") if amount > self.treasury: raise IllegalAction(action, "insufficient treasury") rate_cap = self._p("spend.epoch_rate_cap") if rate_cap is not None and self.epoch_spent + amount > ( rate_cap * self.epoch_start_treasury ): raise IllegalAction(action, "epoch spend rate cap") self._transfer(amount, to) self.emergency_spend_total += amount def _act_advance(self, action: dict) -> None: self.turn += 1 turns_per_epoch = self._p("turns_per_epoch", 10) if turns_per_epoch and self.turn % turns_per_epoch == 0: self.epoch_spent = 0.0 self.epoch_start_treasury = self.treasury if self._p("delegation.epoch_expiry", False): self.delegations.clear() if self.emergency_active: self.emergency_turns += 1 sunset = self._p("emergency.sunset_turns") if sunset is not None and self.turn - self.emergency_started >= sunset: self.emergency_active = False for proposal in list(self.proposals.values()): if proposal.status != "open": continue if self.turn - proposal.opened_turn >= self._review_period(proposal.kind): self._tally(proposal) # ------------------------------------------------------------------ # # tallying and execution # # ------------------------------------------------------------------ # def _tally(self, proposal: Proposal) -> None: proposal.closed_turn = self.turn if self._p("roll_snapshot_at_open", False): denominator = proposal.roll_at_open else: denominator = len(self.roll) total_weight = sum(w for (_, w) in proposal.ballots.values()) yes = sum(w for (c, w) in proposal.ballots.values() if c == "yes") no = sum(w for (c, w) in proposal.ballots.values() if c == "no") proposal.yes_weight, proposal.no_weight = yes, no quorum = self._p("quorum", 0.5) if total_weight < quorum * denominator: proposal.status = "failed" proposal.blocked_reason = "quorum not met" return if yes + no <= 0: proposal.status = "failed" proposal.blocked_reason = "no decisive ballots" return ratio = yes / (yes + no) if ratio <= self._threshold(proposal.kind, proposal.payload): proposal.status = "failed" proposal.blocked_reason = "threshold not met" return executed = self._execute(proposal, ratio) proposal.status = "ratified" if executed else "blocked" if executed: for (_, weight) in proposal.ballots.values(): frac = weight / denominator if denominator else 0.0 self.max_ratified_ballot_frac = max( self.max_ratified_ballot_frac, frac ) def _execute(self, proposal: Proposal, ratio: float) -> bool: kind, payload = proposal.kind, proposal.payload if kind == "spend": return self._execute_spend( proposal, payload["amount"], payload["to"], ratio ) if kind == "amend": self._apply_amend(proposal, payload["key"], payload["value"]) return True if kind == "expel": target = payload["target"] if target in self.roll: self.roll.remove(target) self.expelled.append(target) self.delegations.pop(target, None) for delegator in [ d for d, t in self.delegations.items() if t == target ]: del self.delegations[delegator] return True if kind == "emergency": renewal = self._p("emergency.renewal_threshold") if self.emergency_active and renewal is not None and ratio <= renewal: proposal.blocked_reason = "emergency renewal threshold not met" return False self.emergency_active = True self.emergency_started = self.turn return True # omnibus: execute items sequentially (severability). ok = True for item in payload["items"]: if item["kind"] == "spend": ok = self._execute_spend( proposal, float(item["amount"]), item["to"], ratio ) and ok else: self._apply_amend(proposal, item["key"], item["value"]) return ok def _execute_spend( self, proposal: Proposal, amount: float, to: str, ratio: float ) -> bool: if amount > self.treasury: proposal.blocked_reason = "insufficient treasury" return False floor = self._p("spend.commons_floor") if ( floor is not None and (self.treasury - amount) < floor * self.initial_treasury and ratio <= self._p("thresholds.floor_breach", 0.75) ): proposal.blocked_reason = "commons floor" return False rate_cap = self._p("spend.epoch_rate_cap") if rate_cap is not None and self.epoch_spent + amount > ( rate_cap * self.epoch_start_treasury ): proposal.blocked_reason = "epoch spend rate cap" return False self._transfer(amount, to) return True def _transfer(self, amount: float, to: str) -> None: self.treasury -= amount self.epoch_spent += amount if to == "ALL": if self.roll: share = amount / len(self.roll) for citizen in self.roll: self.balances[citizen] = self.balances.get(citizen, 0.0) + share else: self.balances[to] = self.balances.get(to, 0.0) + amount def _apply_amend(self, proposal: Proposal, key: str, value: Any) -> None: set_param(self.params, key, value) self.kernel_changes.append( { "key": key, "value": value, "proposal": proposal.pid, "kind": proposal.kind, "yes_weight": proposal.yes_weight, "support_of_genesis": ( proposal.yes_weight / self.roll_at_genesis if self.roll_at_genesis else 0.0 ), } ) # ------------------------------------------------------------------ # # results # # ------------------------------------------------------------------ # def result(self) -> ReplayResult: return ReplayResult( initial_treasury=self.initial_treasury, final_treasury=self.treasury, roll_at_genesis=self.roll_at_genesis, final_roll=len(self.roll), expelled=list(self.expelled), balances=dict(self.balances), proposals=[p.summary() for p in self.proposals.values()], kernel_changes=list(self.kernel_changes), emergency_turns=self.emergency_turns, emergency_spend_total=self.emergency_spend_total, max_ratified_ballot_frac=self.max_ratified_ballot_frac, blocked=list(self.blocked), turns_elapsed=self.turn, ) def run_trace( params: dict, citizens: list, treasury: float, trace: list, strict: bool = False, ) -> ReplayResult: """Replay a recorded action trace and return the outcome. With ``strict=False`` (the default, and what regression tests use), illegal actions are recorded in ``result.blocked`` and skipped — this is how a patched kernel "defeats" a recorded exploit. With ``strict=True`` the first illegal action raises :class:`IllegalAction`. """ replay = Replay(params, citizens, treasury) for action in trace: try: replay.apply(action) except IllegalAction as exc: if strict: raise replay.blocked.append({"action": exc.action, "reason": exc.reason}) return replay.result() # ---------------------------------------------------------------------- # # predicates # # ---------------------------------------------------------------------- # def evaluate_predicate(predicate: dict, result: ReplayResult) -> bool: """Evaluate a declarative success predicate against a replay result. Predicates are stored in exploit records as plain JSON so that the archive is auditable without running any code. """ ptype = predicate.get("type") if ptype == "all_of": return all(evaluate_predicate(p, result) for p in predicate["predicates"]) if ptype == "any_of": return any(evaluate_predicate(p, result) for p in predicate["predicates"]) if ptype == "treasury_below_frac": return result.final_treasury < predicate["frac"] * result.initial_treasury if ptype == "treasury_at_least_frac": return result.final_treasury >= predicate["frac"] * result.initial_treasury if ptype == "kernel_change_support_below": return any( change["support_of_genesis"] < predicate["frac"] for change in result.kernel_changes ) if ptype == "emergency_capture": return ( result.emergency_turns >= predicate["min_turns"] and result.emergency_spend_total >= predicate["min_spend_frac"] * result.initial_treasury ) if ptype == "emergency_turns_at_most": return result.emergency_turns <= predicate["n"] if ptype == "max_ballot_weight_above": return result.max_ratified_ballot_frac > predicate["frac"] if ptype == "max_ballot_weight_at_most": return result.max_ratified_ballot_frac <= predicate["frac"] if ptype == "ratified_same_turn": return any( p["status"] == "ratified" and p["turns_open"] == 0 for p in result.proposals ) if ptype == "kernel_changed_via_omnibus": return any(change["kind"] == "omnibus" for change in result.kernel_changes) if ptype == "kernel_changes_at_most": return len(result.kernel_changes) <= predicate["n"] if ptype == "expelled_count_at_most": return len(result.expelled) <= predicate["n"] if ptype == "blocked_actions_at_least": return len(result.blocked) >= predicate["n"] if ptype == "proposals_blocked_at_least": blocked = sum(1 for p in result.proposals if p["status"] == "blocked") return blocked >= predicate["n"] raise ValueError(f"unknown predicate type {ptype!r}")