"""The constitutional state machine. The engine plays a scenario's script against a copy of the constitution's parameters. Every move is checked for legality under the *current* rules (including rules already amended earlier in the same scenario — this is what makes ratchet attacks expressible). Illegal moves are not errors: they are recorded as rejections in the trace, because an attack that requires an illegal move has, by definition, been blocked. Time is discrete days. ``advance_time`` drives the clock; tallies, enactments, emergency sunsets, budget cycles, and exit effective dates are all processed day by day, deterministically. """ from __future__ import annotations import math from collections import defaultdict from dataclasses import dataclass, field from typing import Any, Optional from .metrics import Weights, WelfareReport, compute_welfare from .model import Condition, Harm, Scenario, ScenarioError, ScriptStep from .params import ConstitutionParams EPS = 1e-9 # --------------------------------------------------------------------------- # State objects # --------------------------------------------------------------------------- @dataclass class MemberState: id: str faction: str welfare: float suffrage: bool joined_day: int status: str # nonmember | active | expelled | exited objective: Optional[str] = None delegate_to: Optional[str] = None exit_pending: bool = False @dataclass class BudgetState: bid: str amount_per_cycle: float recipient: str purpose: str benefits: list[Harm] = field(default_factory=list) lapse_harms: list[Harm] = field(default_factory=list) active: bool = True @dataclass class ProposalState: pid: str label: Optional[str] kind: str payload: dict[str, Any] proposer: str created_day: int notice_end: int voting_end: int subject_key: str status: str = "open" # open | passed | failed | enacted | voided reason: Optional[str] = None votes: dict[str, str] = field(default_factory=dict) tally: Optional[dict[str, Any]] = None enact_day: Optional[int] = None enacted_on: Optional[int] = None revert_info: dict[str, Any] = field(default_factory=dict) @dataclass class EmergencyState: active: bool = False declared_day: int = -1 ends_day: int = -1 renewals: int = 0 powers: list[str] = field(default_factory=list) last_sunset_day: int = -(10**6) total_days_active: int = 0 @dataclass class RunResult: outcome: str # blocked | exploited | collateral condition_met: bool welfare: WelfareReport empathy_floor: float trace: list[dict[str, Any]] final_day: int final_treasury: float def to_dict(self) -> dict[str, Any]: return { "outcome": self.outcome, "condition_met": self.condition_met, "welfare": self.welfare.model_dump(), "empathy_floor": self.empathy_floor, "final_day": self.final_day, "final_treasury": self.final_treasury, "trace": self.trace, } # --------------------------------------------------------------------------- # Engine # --------------------------------------------------------------------------- class Engine: def __init__( self, params: ConstitutionParams, scenario: Scenario, overrides: Optional[dict[str, Any]] = None, ) -> None: self.params = params.model_copy(deep=True) if overrides: for path, value in overrides.items(): try: self.params.set_path(path, value) except KeyError as exc: raise ScenarioError( f"interpretation override targets unknown path {path!r}" ) from exc self.scenario = scenario self.weights = Weights(**(scenario.empathy.weights or {})) self.day = 0 self.treasury = float(scenario.setup.treasury) self.members: dict[str, MemberState] = {} for a in scenario.expanded_actors(): self.members[a.id] = MemberState( id=a.id, faction=a.faction, welfare=a.welfare, suffrage=a.suffrage, joined_day=a.joined_day if a.member else -1, status="active" if a.member else "nonmember", objective=a.objective, ) self.proposals: dict[str, ProposalState] = {} self._pcounter = 0 self.labels: dict[str, str] = {} self.last_pid: Optional[str] = None self.budgets: list[BudgetState] = [] for i, b in enumerate(scenario.setup.budgets): self.budgets.append( BudgetState( bid=f"b{i + 1}", amount_per_cycle=b.amount_per_cycle, recipient=b.recipient, purpose=b.purpose, benefits=list(b.benefits), lapse_harms=list(b.lapse_harms), ) ) self.emergency = EmergencyState() self.trace: list[dict[str, Any]] = [] self.treasury_at_cycle_start = self.treasury self.spent_this_cycle = 0.0 self.admissions_this_cycle = 0 self.expulsions_this_cycle = 0 self.failed_subjects: dict[str, int] = {} self.pending_exits: list[tuple[int, str]] = [] # -- public ------------------------------------------------------------- def execute(self) -> RunResult: for idx, step in enumerate(self.scenario.script): self._dispatch(idx, step) condition_met = self.eval_condition(self.scenario.success_condition) welfare = compute_welfare(self.members, self.treasury, self.weights) floor = self.scenario.empathy.floor if condition_met: outcome = "exploited" elif welfare.min_welfare + EPS >= floor: outcome = "blocked" else: outcome = "collateral" return RunResult( outcome=outcome, condition_met=condition_met, welfare=welfare, empathy_floor=floor, trace=self.trace, final_day=self.day, final_treasury=self.treasury, ) # -- dispatch ----------------------------------------------------------- def _dispatch(self, idx: int, step: ScriptStep) -> None: move = step.move if move == "advance_time": self._advance(idx, int(step.days or 0)) elif move == "propose": self._propose(idx, str(step.actor), step) elif move == "vote": voters = [step.actor] if step.actor else self._resolve_group(step.actors) for aid in voters: self._vote(idx, str(aid), step) elif move == "delegate": self._delegate(idx, str(step.actor), str(step.delegate)) elif move == "revoke_delegation": self._revoke_delegation(idx, str(step.actor)) elif move == "exit": self._exit(idx, str(step.actor)) elif move == "challenge": self._challenge(idx, str(step.actor), step) else: # pragma: no cover - guarded by model validation raise ScenarioError(f"unknown move {move!r}") # -- helpers ------------------------------------------------------------ def _record( self, idx: int, move: str, actor: Optional[str], result: str, reason: Optional[str] = None, **detail: Any, ) -> None: entry: dict[str, Any] = { "step": idx, "day": self.day, "move": move, "actor": actor, "result": result, } if reason: entry["reason"] = reason if detail: entry["detail"] = detail self.trace.append(entry) def _resolve_group(self, sel: Any) -> list[str]: if isinstance(sel, list): return list(sel) if sel in ("all", "members"): return [m.id for m in self.members.values()] if isinstance(sel, str) and sel.startswith("faction:"): f = sel.split(":", 1)[1] return [m.id for m in self.members.values() if m.faction == f] raise ScenarioError(f"bad actor selector {sel!r}") def _resolve_proposal(self, ref: Optional[str]) -> ProposalState: if not ref or not ref.startswith("$"): raise ScenarioError(f"bad proposal reference {ref!r}") name = ref[1:] if name == "last": if self.last_pid is None: raise ScenarioError("'$last' used before any proposal exists") return self.proposals[self.last_pid] pid = self.labels.get(name) if pid is None: raise ScenarioError(f"undefined proposal label {ref!r}") return self.proposals[pid] def _active_members(self) -> list[MemberState]: return [m for m in self.members.values() if m.status == "active"] def _is_join_eligible(self, m: MemberState, at_day: int) -> bool: return m.joined_day + self.params.voting.eligibility_delay_days <= at_day def _kind_class(self, kind: str, payload: dict[str, Any]) -> str: if kind == "rule_change": changes = payload.get("changes", []) if any(self.params.is_kernel_path(p) for p, _ in changes): return "rule_change_kernel" return "rule_change_policy" return kind def _parse_effects(self, payload: dict[str, Any], key: str) -> list[Harm]: raw = payload.get(key) if raw is None: return [] if not isinstance(raw, list): raise ScenarioError(f"payload {key!r} must be a list") return [h if isinstance(h, Harm) else Harm.model_validate(h) for h in raw] def _apply_effects(self, effects: list[Harm], sign: float) -> list[tuple[str, float]]: """Apply harms (sign=-1) or benefits (sign=+1); return deltas for revert.""" deltas: list[tuple[str, float]] = [] for h in effects: targets: list[MemberState] if h.target == "all": targets = self._active_members() elif h.target.startswith("faction:"): f = h.target.split(":", 1)[1] targets = [m for m in self._active_members() if m.faction == f] else: m = self.members.get(h.target) if m is None: raise ScenarioError(f"effect targets unknown actor {h.target!r}") targets = [m] for m in targets: amount = h.amount if h.disparate is not None and m.faction == h.disparate.faction: amount = h.amount * h.disparate.multiplier delta = sign * amount m.welfare += delta deltas.append((m.id, delta)) return deltas def _revert_deltas(self, deltas: list[tuple[str, float]]) -> None: for mid, delta in deltas: self.members[mid].welfare -= delta # -- propose ------------------------------------------------------------ def _normalize_payload(self, kind: str, payload: dict[str, Any]) -> dict[str, Any]: p = dict(payload) def need(key: str, types: tuple[type, ...]) -> Any: if key not in p: raise ScenarioError(f"{kind} payload missing {key!r}") v = p[key] if not isinstance(v, types): raise ScenarioError(f"{kind} payload field {key!r} has wrong type") return v if kind == "rule_change": if "changes" in p: raw = need("changes", (list,)) changes = [] for ch in raw: if not isinstance(ch, dict) or "path" not in ch or "value" not in ch: raise ScenarioError("rule_change changes entries need 'path' and 'value'") changes.append((str(ch["path"]), ch["value"])) else: path = str(need("path", (str,))) if "value" not in p: raise ScenarioError("rule_change payload missing 'value'") changes = [(path, p["value"])] p["changes"] = changes elif kind == "spend": amount = need("amount", (int, float)) if amount <= 0: raise ScenarioError("spend amount must be positive") need("recipient", (str,)) elif kind == "budget": amount = need("amount_per_cycle", (int, float)) if amount <= 0: raise ScenarioError("budget amount_per_cycle must be positive") need("recipient", (str,)) need("purpose", (str,)) elif kind == "repeal_budget": need("purpose", (str,)) elif kind == "admit": ids = need("members", (list,)) if not ids: raise ScenarioError("admit payload needs a non-empty 'members' list") for mid in ids: if mid not in self.members: raise ScenarioError(f"admit targets undeclared actor {mid!r}") elif kind == "expel": need("target", (str,)) if p["target"] not in self.members: raise ScenarioError(f"expel targets undeclared actor {p['target']!r}") elif kind == "emergency_declare": need("powers", (list,)) d = need("duration_days", (int,)) if d < 1: raise ScenarioError("emergency duration_days must be >= 1") elif kind == "emergency_extend": d = need("days", (int,)) if d < 1: raise ScenarioError("emergency extension days must be >= 1") elif kind == "emergency_end": pass elif kind == "text": need("title", (str,)) p["harms"] = self._parse_effects(p, "harms") p["benefits"] = self._parse_effects(p, "benefits") return p def _subject_key(self, kind: str, payload: dict[str, Any]) -> str: if kind == "rule_change": return "rule_change:" + ",".join(sorted(p for p, _ in payload["changes"])) if kind == "expel": return f"expel:{payload['target']}" if kind == "spend": return f"spend:{payload['recipient']}" if kind == "budget": return f"budget:{payload['purpose']}" if kind == "repeal_budget": return f"repeal_budget:{payload['purpose']}" if kind == "admit": return "admit:" + ",".join(sorted(payload["members"])) if kind == "text": return f"text:{payload['title']}" return kind def _admission_limit(self) -> int: frac = self.params.membership.max_admissions_per_cycle_fraction return max(1, math.floor(frac * len(self._active_members()))) def _propose(self, idx: int, actor: str, step: ScriptStep) -> None: kind = str(step.kind) m = self.members.get(actor) if m is None: raise ScenarioError(f"unknown actor {actor!r}") def reject(reason: str) -> None: self._record(idx, "propose", actor, "rejected", reason, kind=kind) if m.status != "active": return reject("proposer_not_active") if not self._is_join_eligible(m, self.day): return reject("proposer_not_yet_eligible") payload = self._normalize_payload(kind, step.payload) subject_key = self._subject_key(kind, payload) cooldown = self.params.amendments.failed_subject_cooldown_days failed_at = self.failed_subjects.get(subject_key) if failed_at is not None and self.day < failed_at + cooldown: return reject("subject_cooldown") if ( self.emergency.active and self._kind_class(kind, payload) in self.params.emergency.prohibited_kinds_during ): return reject("prohibited_during_emergency") # Kind-specific legality at proposal time. if kind == "rule_change": changes = payload["changes"] if self.params.amendments.single_subject and len(changes) > 1: return reject("single_subject") for path, value in changes: if self.params.is_invariant_locked(path): return reject("invariant_locked") exists = self.params.has_path(path) if not exists and not path.startswith("policy."): return reject("invalid_path") if exists: cur = self.params.get_path(path) ok_types = ( (isinstance(cur, bool) and isinstance(value, bool)) or ( isinstance(cur, (int, float)) and not isinstance(cur, bool) and isinstance(value, (int, float)) and not isinstance(value, bool) ) or (isinstance(cur, str) and isinstance(value, str)) or (isinstance(cur, list) and isinstance(value, list)) ) if not ok_types: return reject("type_mismatch") last = path.split(".")[-1] if isinstance(value, (int, float)) and not isinstance(value, bool): if last.endswith("_threshold") or last.endswith("_fraction") or last == "quorum": if not (0.0 <= float(value) <= 1.0): return reject("fraction_out_of_bounds") if last.endswith("_threshold"): if float(value) + EPS < self.params.amendments.min_decision_threshold: return reject("below_min_threshold") if path == "voting.quorum": if float(value) + EPS < self.params.amendments.min_quorum: return reject("below_min_quorum") if last.endswith("_days") and float(value) < 0: return reject("negative_duration") elif kind == "spend": cap = self.params.treasury.per_proposal_cap_fraction * self.treasury if float(payload["amount"]) > cap + EPS: return reject("per_proposal_cap_exceeded") elif kind == "repeal_budget": if not any(b.active and b.purpose == payload["purpose"] for b in self.budgets): return reject("no_such_budget") elif kind == "admit": ids = payload["members"] for mid in ids: if self.members[mid].status != "nonmember": return reject("target_already_member") if self.admissions_this_cycle + len(ids) > self._admission_limit(): return reject("admission_rate_limit") elif kind == "expel": target = self.members[payload["target"]] if target.status != "active": return reject("target_not_active") if self.expulsions_this_cycle >= self.params.membership.max_expulsions_per_cycle: return reject("expulsion_rate_limit") elif kind == "emergency_declare": if self.emergency.active: return reject("emergency_already_active") if not set(payload["powers"]) <= set(self.params.emergency.powers_available): return reject("power_not_available") if int(payload["duration_days"]) > self.params.emergency.max_duration_days: return reject("duration_exceeds_max") cooldown_e = self.params.emergency.redeclaration_cooldown_days if self.day - self.emergency.last_sunset_day < cooldown_e: return reject("redeclaration_cooldown") elif kind == "emergency_extend": if not self.emergency.active: return reject("no_emergency_active") if self.emergency.renewals >= self.params.emergency.max_renewals: return reject("renewal_limit") if int(payload["days"]) > self.params.emergency.max_duration_days: return reject("duration_exceeds_max") elif kind == "emergency_end": if not self.emergency.active: return reject("no_emergency_active") # Build the proposal with the current clock rules. notice = self.params.voting.notice_period_days period = self.params.voting.voting_period_days if kind == "expel": notice += self.params.membership.expulsion_due_process_days if self.emergency.active and "expedite_voting" in self.emergency.powers: factor = self.params.emergency.expedite_factor notice = math.ceil(notice * factor) period = max(1, math.ceil(period * factor)) self._pcounter += 1 pid = f"p{self._pcounter}" prop = ProposalState( pid=pid, label=step.label, kind=kind, payload=payload, proposer=actor, created_day=self.day, notice_end=self.day + notice, voting_end=self.day + notice + period, subject_key=subject_key, ) self.proposals[pid] = prop self.last_pid = pid if step.label: self.labels[step.label] = pid self._record( idx, "propose", actor, "ok", pid=pid, kind=kind, label=step.label, notice_end=prop.notice_end, voting_end=prop.voting_end, ) # -- vote --------------------------------------------------------------- def _vote(self, idx: int, actor: str, step: ScriptStep) -> None: prop = self._resolve_proposal(step.proposal) m = self.members.get(actor) if m is None: raise ScenarioError(f"unknown actor {actor!r}") def reject(reason: str) -> None: self._record(idx, "vote", actor, "rejected", reason, pid=prop.pid) if prop.status != "open": return reject("proposal_not_open") if self.day < prop.notice_end: return reject("notice_period_active") if self.day >= prop.voting_end: return reject("voting_closed") if m.status != "active": return reject("voter_not_active") if not m.suffrage: return reject("no_suffrage") if ( prop.kind in ("spend", "budget") and prop.payload.get("recipient") == actor and not self.params.treasury.recipient_may_vote ): return reject("conflict_of_interest") if ( prop.kind == "expel" and prop.payload.get("target") == actor and not self.params.membership.accused_may_vote ): return reject("accused_may_not_vote") prop.votes[actor] = str(step.choice) self._record(idx, "vote", actor, "ok", pid=prop.pid, choice=step.choice) # -- delegation --------------------------------------------------------- def _delegate(self, idx: int, actor: str, delegate: str) -> None: m = self.members.get(actor) d = self.members.get(delegate) if m is None or d is None: raise ScenarioError("delegate move references unknown actor") def reject(reason: str) -> None: self._record(idx, "delegate", actor, "rejected", reason, delegate=delegate) if not self.params.voting.delegation.allowed: return reject("delegation_not_allowed") if actor == delegate: return reject("self_delegation") if m.status != "active" or d.status != "active": return reject("party_not_active") m.delegate_to = delegate self._record(idx, "delegate", actor, "ok", delegate=delegate) def _revoke_delegation(self, idx: int, actor: str) -> None: m = self.members.get(actor) if m is None: raise ScenarioError(f"unknown actor {actor!r}") if not self.params.voting.delegation.revocable_anytime: return self._record(idx, "revoke_delegation", actor, "rejected", "delegation_irrevocable") m.delegate_to = None self._record(idx, "revoke_delegation", actor, "ok") # -- exit --------------------------------------------------------------- def _exit(self, idx: int, actor: str) -> None: m = self.members.get(actor) if m is None: raise ScenarioError(f"unknown actor {actor!r}") def reject(reason: str) -> None: self._record(idx, "exit", actor, "rejected", reason) if not self.params.fork.right_to_exit: return reject("exit_denied") if m.status != "active": return reject("not_active") if m.exit_pending: return reject("exit_already_pending") eff = self.day + self.params.fork.exit_notice_days m.exit_pending = True self.pending_exits.append((eff, actor)) self._record(idx, "exit", actor, "ok", effective_day=eff) # -- challenge ---------------------------------------------------------- def _challenge(self, idx: int, actor: str, step: ScriptStep) -> None: prop = self._resolve_proposal(step.proposal) m = self.members.get(actor) if m is None: raise ScenarioError(f"unknown actor {actor!r}") grounds = str(step.grounds) def reject(reason: str) -> None: self._record(idx, "challenge", actor, "rejected", reason, pid=prop.pid, grounds=grounds) if not self.params.review.enabled: return reject("review_disabled") if m.status != "active": return reject("challenger_not_active") if prop.status != "enacted" or prop.enacted_on is None: return reject("not_enacted") if self.day > prop.enacted_on + self.params.review.challenge_window_days: return reject("challenge_window_closed") harms: list[Harm] = prop.payload.get("harms", []) violation = False if grounds == "invariant": if any(h.target != "all" and h.amount > 0 for h in harms): violation = True # targeted legislation if prop.kind == "rule_change" and any( self.params.is_invariant_locked(p) for p, _ in prop.payload["changes"] ): violation = True # defense in depth if not violation: return reject("no_violation_found") elif grounds == "disparate_impact": ratio = self.params.review.disparate_impact_ratio if any( h.disparate is not None and h.disparate.multiplier + EPS >= ratio and h.amount > 0 for h in harms ): violation = True if not violation: return reject("impact_within_ratio") self._void(prop) self._record(idx, "challenge", actor, "ok", pid=prop.pid, grounds=grounds, voided=True) def _void(self, prop: ProposalState) -> None: info = prop.revert_info kind = prop.kind if kind == "rule_change": for path, old in reversed(info.get("changes", [])): self.params.set_path(path, old) elif kind == "spend": self.treasury += info["amount"] self.spent_this_cycle = max(0.0, self.spent_this_cycle - info["amount"]) rid = info.get("recipient_member") if rid: self.members[rid].welfare -= info["amount"] * self.weights.payout_weight elif kind == "budget": for b in self.budgets: if b.bid == info.get("bid"): b.active = False elif kind == "repeal_budget": for b in self.budgets: if b.bid in info.get("bids", []): b.active = True elif kind == "expel": target = self.members[info["target"]] target.status = "active" target.welfare += self.weights.expelled_penalty self.expulsions_this_cycle = max(0, self.expulsions_this_cycle - 1) elif kind == "admit": for mid in info.get("members", []): self.members[mid].status = "nonmember" self.members[mid].joined_day = -1 self.admissions_this_cycle = max( 0, self.admissions_this_cycle - len(info.get("members", [])) ) elif kind in ("emergency_declare", "emergency_extend"): if self.emergency.active: self.emergency.active = False self.emergency.last_sunset_day = self.day self._revert_deltas(info.get("welfare_deltas", [])) prop.status = "voided" # -- time --------------------------------------------------------------- def _advance(self, idx: int, days: int) -> None: self._record(idx, "advance_time", None, "ok", days=days) cycle = self.params.treasury.cycle_days for _ in range(days): self.day += 1 d = self.day # Cycle boundary: reset rate counters, snapshot treasury, pay budgets. if cycle > 0 and d % cycle == 0: self.spent_this_cycle = 0.0 self.admissions_this_cycle = 0 self.expulsions_this_cycle = 0 self.treasury_at_cycle_start = self.treasury for b in self.budgets: if not b.active: continue if self.treasury + EPS >= b.amount_per_cycle: self.treasury -= b.amount_per_cycle rm = self.members.get(b.recipient) if rm is not None and rm.status == "active": rm.welfare += b.amount_per_cycle * self.weights.payout_weight self._apply_effects(b.benefits, +1.0) self.trace.append( {"step": idx, "day": d, "move": "budget_payment", "actor": None, "result": "ok", "detail": {"bid": b.bid, "amount": b.amount_per_cycle}} ) else: self._apply_effects(b.lapse_harms, -1.0) self.trace.append( {"step": idx, "day": d, "move": "budget_payment", "actor": None, "result": "missed", "detail": {"bid": b.bid}} ) # Exits taking effect. for eff, aid in [e for e in self.pending_exits if e[0] == d]: m = self.members[aid] if m.status == "active": payout = 0.0 if self.params.fork.exit_share == "pro_rata": active = self._active_members() payout = self.treasury / len(active) if active else 0.0 self.treasury -= payout m.status = "exited" m.welfare += payout * self.weights.payout_weight for other in self.members.values(): if other.delegate_to == aid: other.delegate_to = None self.trace.append( {"step": idx, "day": d, "move": "exit_effective", "actor": aid, "result": "ok", "detail": {"payout": round(payout, 6)}} ) # Emergency burden and sunset. if self.emergency.active: self.emergency.total_days_active += 1 burden = self.weights.emergency_burden_per_day if burden > 0: for m in self._active_members(): m.welfare -= burden if d >= self.emergency.ends_day: self.emergency.active = False self.emergency.last_sunset_day = d self.trace.append( {"step": idx, "day": d, "move": "emergency_sunset", "actor": None, "result": "ok"} ) # Tally proposals whose voting closed. for prop in sorted(self.proposals.values(), key=lambda p: p.pid): if prop.status == "open" and prop.voting_end <= d: self._tally(idx, prop, d) # Enact passed proposals whose enactment day arrived. for prop in sorted(self.proposals.values(), key=lambda p: p.pid): if prop.status == "passed" and prop.enact_day is not None and prop.enact_day <= d: self._enact(idx, prop, d) # -- tally -------------------------------------------------------------- def _threshold_for(self, prop: ProposalState) -> float: kind = prop.kind if kind == "rule_change": if any(self.params.is_kernel_path(p) for p, _ in prop.payload["changes"]): return self.params.voting.kernel_threshold return self.params.voting.ordinary_threshold if kind in ("spend", "budget", "repeal_budget"): return self.params.treasury.spend_threshold if kind == "expel": return self.params.membership.expulsion_threshold if kind == "admit": return self.params.membership.admission_threshold if kind == "emergency_declare": return self.params.emergency.declaration_threshold if kind == "emergency_extend": return self.params.emergency.renewal_threshold return self.params.voting.ordinary_threshold def _tally(self, idx: int, prop: ProposalState, d: int) -> None: eligible = [ m.id for m in self.members.values() if m.status == "active" and m.suffrage and self._is_join_eligible(m, prop.voting_end) ] eligible_set = set(eligible) direct = {a: c for a, c in prop.votes.items() if a in eligible_set} deleg = self.params.voting.delegation counted_delegated: dict[str, str] = {} if deleg.allowed and eligible: cap = max(1, math.floor(deleg.delegate_cap_fraction * len(eligible))) load: dict[str, int] = defaultdict(int) for mid in sorted(eligible_set - set(direct)): dt = self.members[mid].delegate_to if dt and dt in direct and load[dt] < cap: load[dt] += 1 counted_delegated[mid] = direct[dt] all_votes = {**direct, **counted_delegated} yes = sum(1 for c in all_votes.values() if c == "yes") no = sum(1 for c in all_votes.values() if c == "no") abstain = sum(1 for c in all_votes.values() if c == "abstain") participation = (len(all_votes) / len(eligible)) if eligible else 0.0 threshold = self._threshold_for(prop) quorum = self.params.voting.quorum prop.tally = { "eligible": len(eligible), "yes": yes, "no": no, "abstain": abstain, "delegated": len(counted_delegated), "participation": round(participation, 6), "threshold": threshold, "quorum": quorum, } if not eligible or participation + EPS < quorum: prop.status = "failed" prop.reason = "quorum" # Quorum failure preserves the status quo and does NOT trigger the # failed-subject cooldown: boycotts cannot lock a subject out. elif yes > no and (yes + no) > 0 and (yes / (yes + no)) + EPS >= threshold: prop.status = "passed" delay = 0 if prop.kind == "rule_change" and any( self.params.is_kernel_path(p) for p, _ in prop.payload["changes"] ): delay = self.params.amendments.ratification_delay_days prop.enact_day = d + delay else: prop.status = "failed" prop.reason = "threshold" self.failed_subjects[prop.subject_key] = d self.trace.append( {"step": idx, "day": d, "move": "tally", "actor": None, "result": prop.status, "reason": prop.reason, "detail": {"pid": prop.pid, **prop.tally}} ) # -- enactment ---------------------------------------------------------- def _enact(self, idx: int, prop: ProposalState, d: int) -> None: kind = prop.kind payload = prop.payload def fail(reason: str) -> None: prop.status = "failed" prop.reason = reason self.trace.append( {"step": idx, "day": d, "move": "enact", "actor": None, "result": "failed", "reason": reason, "detail": {"pid": prop.pid}} ) if ( self.emergency.active and self._kind_class(kind, payload) in self.params.emergency.prohibited_kinds_during ): return fail("prohibited_during_emergency") if kind == "spend": amount = float(payload["amount"]) if amount > self.treasury + EPS: return fail("insufficient_funds") cap = self.params.treasury.per_cycle_cap_fraction * self.treasury_at_cycle_start if self.spent_this_cycle + amount > cap + EPS: return fail("cycle_cap_exceeded") elif kind == "admit": ids = payload["members"] if any(self.members[i].status != "nonmember" for i in ids): return fail("target_already_member") if self.admissions_this_cycle + len(ids) > self._admission_limit(): return fail("admission_rate_limit") elif kind == "expel": if self.members[payload["target"]].status != "active": return fail("target_not_active") if self.expulsions_this_cycle >= self.params.membership.max_expulsions_per_cycle: return fail("expulsion_rate_limit") elif kind == "emergency_declare": if self.emergency.active: return fail("emergency_already_active") elif kind == "emergency_extend": if not self.emergency.active: return fail("no_emergency_active") if self.emergency.renewals >= self.params.emergency.max_renewals: return fail("renewal_limit") info: dict[str, Any] = {} if kind == "spend": amount = float(payload["amount"]) self.treasury -= amount self.spent_this_cycle += amount rid = payload["recipient"] rm = self.members.get(rid) if rm is not None and rm.status == "active": rm.welfare += amount * self.weights.payout_weight info["recipient_member"] = rid info["amount"] = amount elif kind == "budget": bid = f"b{len(self.budgets) + 1}" self.budgets.append( BudgetState( bid=bid, amount_per_cycle=float(payload["amount_per_cycle"]), recipient=payload["recipient"], purpose=payload["purpose"], benefits=payload.get("benefits", []), lapse_harms=self._parse_effects(payload, "lapse_harms"), ) ) info["bid"] = bid elif kind == "repeal_budget": bids = [] for b in self.budgets: if b.active and b.purpose == payload["purpose"]: b.active = False bids.append(b.bid) info["bids"] = bids elif kind == "rule_change": olds = [] for path, value in payload["changes"]: old = self.params.set_path(path, value) olds.append((path, old)) info["changes"] = olds elif kind == "admit": for mid in payload["members"]: m = self.members[mid] m.status = "active" m.joined_day = d self.admissions_this_cycle += len(payload["members"]) info["members"] = list(payload["members"]) elif kind == "expel": target = self.members[payload["target"]] target.status = "expelled" target.welfare -= self.weights.expelled_penalty target.delegate_to = None for other in self.members.values(): if other.delegate_to == target.id: other.delegate_to = None self.expulsions_this_cycle += 1 info["target"] = target.id elif kind == "emergency_declare": self.emergency.active = True self.emergency.declared_day = d self.emergency.ends_day = d + int(payload["duration_days"]) self.emergency.renewals = 0 self.emergency.powers = list(payload["powers"]) info["prior_sunset"] = self.emergency.last_sunset_day elif kind == "emergency_extend": self.emergency.ends_day += int(payload["days"]) self.emergency.renewals += 1 elif kind == "emergency_end": if self.emergency.active: self.emergency.active = False self.emergency.last_sunset_day = d deltas: list[tuple[str, float]] = [] deltas.extend(self._apply_effects(payload.get("harms", []), -1.0)) deltas.extend(self._apply_effects(payload.get("benefits", []), +1.0)) info["welfare_deltas"] = deltas prop.revert_info = info prop.status = "enacted" prop.enacted_on = d self.trace.append( {"step": idx, "day": d, "move": "enact", "actor": None, "result": "enacted", "detail": {"pid": prop.pid, "kind": kind}} ) # -- condition evaluation ------------------------------------------------- def eval_condition(self, cond: Condition) -> bool: if cond.all_ is not None: return all(self.eval_condition(c) for c in cond.all_) if cond.any_ is not None: return any(self.eval_condition(c) for c in cond.any_) if cond.not_ is not None: return not self.eval_condition(cond.not_) return self._eval_check(cond) def _cmp(self, op: Optional[str], lhs: Any, rhs: Any) -> bool: op = op or "eq" if op == "eq": if isinstance(lhs, float) or isinstance(rhs, float): return abs(float(lhs) - float(rhs)) <= 1e-6 return lhs == rhs if op == "ne": return not self._cmp("eq", lhs, rhs) if op == "lt": return float(lhs) < float(rhs) - EPS if op == "le": return float(lhs) <= float(rhs) + EPS if op == "gt": return float(lhs) > float(rhs) + EPS if op == "ge": return float(lhs) >= float(rhs) - EPS raise ScenarioError(f"unknown op {op!r}") def _welfare_group(self, target: str) -> list[float]: report = compute_welfare(self.members, self.treasury, self.weights) if target == "all": return list(report.per_actor.values()) if target.startswith("faction:"): f = target.split(":", 1)[1] return [ report.per_actor[m.id] for m in self.members.values() if m.faction == f ] if target in report.per_actor: return [report.per_actor[target]] raise ScenarioError(f"welfare target {target!r} unknown") def _eval_check(self, cond: Condition) -> bool: check = cond.check if check == "treasury_below": return self.treasury < float(cond.value) - EPS if check == "treasury_above": return self.treasury > float(cond.value) + EPS if check == "param": try: cur = self.params.get_path(str(cond.path)) except KeyError: return False return self._cmp(cond.op, cur, cond.value) if check == "member_status": m = self.members.get(str(cond.target)) if m is None: raise ScenarioError(f"unknown member {cond.target!r}") return m.status == cond.value if check == "enacted": prop = self._resolve_proposal(cond.proposal) return prop.status == "enacted" if check == "not_enacted": prop = self._resolve_proposal(cond.proposal) return prop.status != "enacted" if check == "proposal_status": prop = self._resolve_proposal(cond.proposal) return prop.status == cond.value if check == "emergency_active": want = True if cond.value is None else bool(cond.value) return self.emergency.active == want if check == "emergency_days_total": return self._cmp(cond.op or "ge", self.emergency.total_days_active, cond.value) if check == "welfare_below": vals = self._welfare_group(str(cond.target)) if not vals: return False return min(vals) < float(cond.value) - EPS if check == "suffrage": m = self.members.get(str(cond.target)) if m is None: raise ScenarioError(f"unknown member {cond.target!r}") return m.suffrage == bool(cond.value) if check == "faction_eligible_share": elig = [ m for m in self.members.values() if m.status == "active" and m.suffrage and self._is_join_eligible(m, self.day) ] if not elig: return False share = sum(1 for m in elig if m.faction == cond.faction) / len(elig) return self._cmp(cond.op or "gt", share, cond.value if cond.value is not None else 0.5) if check == "budget_active": want = True if cond.value is None else bool(cond.value) actual = any(b.active and b.purpose == cond.purpose for b in self.budgets) return actual == want if check == "members_active_count": return self._cmp(cond.op or "eq", len(self._active_members()), cond.value) raise ScenarioError(f"unknown check {check!r}")