"""Simulation state: the roll, the treasury, proposals, and the ledger. Everything an agent does lands in the public ledger (INV-4). Detectors, metrics, and the exploit-to-test pipeline all read the same ledger the agents produce — there is no privileged side channel. """ from __future__ import annotations import copy from dataclasses import dataclass, field from typing import Any, Iterable, Iterator from .kernel import Kernel __all__ = [ "AgentRecord", "Proposal", "Emergency", "LedgerEntry", "SimState", "initial_state", ] @dataclass class AgentRecord: agent_id: str faction: str = "none" citizen: bool = True balance: float = 0.0 delegate_to: str | None = None joined_turn: int = 0 expelled_turn: int | None = None @dataclass class Proposal: proposal_id: str kind: str # spend | amend | expel | emergency_declare | emergency_renew proposer: str payload: dict created_turn: int closes_turn: int # inclusive: tallied at the end of this turn threshold_class: str = "ordinary" # for amend: ordinary | kernel status: str = "open" # open | passed | failed | executed | execution_failed votes: dict[str, str] = field(default_factory=dict) # agent_id -> yes|no|abstain @dataclass class Emergency: active: bool = False declared_turn: int | None = None expires_turn: int | None = None renewals: int = 0 @dataclass class LedgerEntry: turn: int kind: str actor: str | None detail: dict @dataclass class SimState: treasury: float agents: dict[str, AgentRecord] epoch_length: int = 10 turn: int = 0 initial_treasury: float = 0.0 proposals: dict[str, Proposal] = field(default_factory=dict) ledger: list[LedgerEntry] = field(default_factory=list) emergency: Emergency = field(default_factory=Emergency) next_pid: int = 1 def __post_init__(self) -> None: if self.initial_treasury <= 0: self.initial_treasury = self.treasury # ------------------------------------------------------------- roll def citizens(self) -> list[AgentRecord]: return [a for _, a in sorted(self.agents.items()) if a.citizen] def roll_ids(self) -> list[str]: return [a.agent_id for a in self.citizens()] def roll_size(self) -> int: return len(self.citizens()) def is_citizen(self, agent_id: str) -> bool: rec = self.agents.get(agent_id) return rec is not None and rec.citizen # ---------------------------------------------------------- ledger def record(self, kind: str, actor: str | None = None, **detail: Any) -> LedgerEntry: entry = LedgerEntry(turn=self.turn, kind=kind, actor=actor, detail=detail) self.ledger.append(entry) return entry def ledger_since(self, turn: int) -> Iterator[LedgerEntry]: return (e for e in self.ledger if e.turn >= turn) # ------------------------------------------------------- proposals def new_proposal_id(self) -> str: pid = f"P{self.next_pid:04d}" self.next_pid += 1 return pid def open_proposals(self) -> list[Proposal]: return [p for _, p in sorted(self.proposals.items()) if p.status == "open"] # ----------------------------------------------------------- epoch @property def epoch(self) -> int: return self.turn // self.epoch_length @property def is_epoch_end(self) -> bool: return self.turn % self.epoch_length == self.epoch_length - 1 # ------------------------------------------------------ delegation def resolve_terminal(self, agent_id: str, transitive: bool) -> str: """Resolve where ``agent_id``'s ballot weight lands. Follows the delegation chain through citizen delegates. With ``transitive`` off, follows at most one hop. A cycle returns the weight to the original principal (where it is wasted unless they revoke — sticky revocation makes that bite, see EXP-005). """ rec = self.agents.get(agent_id) if rec is None or not rec.citizen: return agent_id if rec.delegate_to is None: return agent_id first = self.agents.get(rec.delegate_to) if first is None or not first.citizen: return agent_id if not transitive: return first.agent_id visited = {agent_id} cur = first while True: if cur.agent_id in visited: return agent_id # cycle: weight stays home visited.add(cur.agent_id) nxt_id = cur.delegate_to if nxt_id is None: return cur.agent_id nxt = self.agents.get(nxt_id) if nxt is None or not nxt.citizen: return cur.agent_id cur = nxt def effective_weights(self, transitive: bool) -> dict[str, float]: """One unit of weight per citizen, resolved to its terminal delegate.""" weights: dict[str, float] = {} for rec in self.citizens(): terminal = self.resolve_terminal(rec.agent_id, transitive) weights[terminal] = weights.get(terminal, 0.0) + 1.0 return weights def inbound_delegations(self, agent_id: str) -> int: return sum( 1 for rec in self.citizens() if rec.delegate_to == agent_id ) # -------------------------------------------------------- treasury def executed_spend_to(self, beneficiary: str, since_turn: int) -> float: return sum( e.detail.get("amount", 0.0) for e in self.ledger if e.kind == "spend_executed" and e.turn >= since_turn and e.detail.get("to") == beneficiary ) def executed_spend_total(self, since_turn: int) -> float: return sum( e.detail.get("amount", 0.0) for e in self.ledger if e.kind == "spend_executed" and e.turn >= since_turn ) def expulsions_in_epoch(self, epoch: int) -> int: return sum( 1 for e in self.ledger if e.kind == "expel_executed" and e.turn // self.epoch_length == epoch ) # ------------------------------------------------------------ misc def clone(self) -> "SimState": return copy.deepcopy(self) def initial_state( kernel: Kernel, roster: Iterable[tuple[str, str]], treasury: float, balances: dict[str, float] | None = None, ) -> SimState: """Build a fresh state from a roster of ``(agent_id, faction)`` pairs.""" balances = balances or {} agents = { agent_id: AgentRecord( agent_id=agent_id, faction=faction, balance=float(balances.get(agent_id, 0.0)), ) for agent_id, faction in roster } if not agents: raise ValueError("roster must contain at least one agent") epoch_length = int(kernel.get("epoch_length_turns", 10)) return SimState( treasury=float(treasury), agents=agents, epoch_length=epoch_length, initial_treasury=float(treasury), )