"""The legality engine: every move must be legal under the current text. This is one data-driven engine for *all* kernel versions. The rules it enforces are read out of the kernel's params at check time, so the same scripted attack can be replayed under v0.1 (where it succeeds) and v0.2 (where the very same actions are refused or out-voted). Each refusal cites the article it rests on — rulings are legible, like everything else here. """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Mapping from .actions import ( PROPOSAL_KINDS, VOTE_CHOICES, Action, Delegate, Pass, Propose, RevokeDelegation, Transfer, Vote, ) from .kernel import Kernel from .state import SimState __all__ = ["Ruling", "check_action", "resolve_window"] EPS = 1e-9 @dataclass(frozen=True) class Ruling: legal: bool reason: str article: str | None = None def _allow(reason: str = "ok", article: str | None = None) -> Ruling: return Ruling(True, reason, article) def _deny(reason: str, article: str) -> Ruling: return Ruling(False, reason, article) # ----------------------------------------------------------- entrypoint def check_action(state: SimState, kernel: Kernel, action: Action) -> Ruling: rec = state.agents.get(action.actor) if rec is None: return _deny(f"unknown agent {action.actor!r}", "A1") if isinstance(action, Pass): return _allow() if isinstance(action, Transfer): return _check_transfer(state, action) # Everything below requires standing on the roll. if not rec.citizen: return _deny("only citizens may propose, vote, or delegate", "A1") if isinstance(action, Propose): return _check_propose(state, kernel, action) if isinstance(action, Vote): return _check_vote(state, kernel, action) if isinstance(action, Delegate): return _check_delegate(state, kernel, action) if isinstance(action, RevokeDelegation): return _check_revoke(state, kernel, action) return _deny(f"unrecognized action type {type(action).__name__}", "A2") # -------------------------------------------------------------- windows def resolve_window( state: SimState, kernel: Kernel, kind: str, requested: int | None ) -> int | None: """Effective voting window in turns, or ``None`` if the request is illegal. A window of ``w`` means the proposal is tallied at the end of turn ``created + w - 1``. During an emergency with the ``fast_track_spend`` power, spend proposals may use ``w = 1`` (same-turn close) — agents acting earlier in the turn order never see them. That asymmetry is deliberate v0.1 attack surface. """ default = int(kernel.get("voting_window.default_turns", 3)) minimum = int(kernel.get("voting_window.min_turns", 2)) if kind == "spend" and state.emergency.active: powers = kernel.get("emergency.powers") or [] if "fast_track_spend" in powers: minimum = 1 if kind == "expel": minimum = max(minimum, int(kernel.get("expulsion.review_delay_turns", 0))) window = int(requested) if requested is not None else max(default, minimum) if window < minimum: return None return window # ------------------------------------------------------------ proposals def _check_propose(state: SimState, kernel: Kernel, action: Propose) -> Ruling: if action.kind not in PROPOSAL_KINDS: return _deny(f"unknown proposal kind {action.kind!r}", "A2") if not isinstance(action.payload, Mapping): return _deny("proposal payload must be a mapping", "A2") if resolve_window(state, kernel, action.kind, action.window_turns) is None: return _deny( "voting window shorter than the constitutional minimum", "A2" ) if action.kind == "spend": return _check_spend_payload(state, kernel, action) if action.kind == "amend": return _check_amend_payload(state, kernel, action) if action.kind == "expel": return _check_expel_payload(state, kernel, action) if action.kind == "emergency_declare": if state.emergency.active: return _deny("an emergency is already in force", "A8") return _allow(article="A8") if action.kind == "emergency_renew": if not state.emergency.active: return _deny("no emergency is in force to renew", "A8") max_renewals = kernel.get("emergency.max_renewals") if max_renewals is not None and state.emergency.renewals >= int(max_renewals): return _deny( f"emergency renewal cap reached ({max_renewals})", "A8" ) return _allow(article="A8") return _deny(f"unhandled proposal kind {action.kind!r}", "A2") def _check_spend_payload(state: SimState, kernel: Kernel, action: Propose) -> Ruling: to = action.payload.get("to") amount = action.payload.get("amount") if to not in state.agents: return _deny(f"spend beneficiary {to!r} is not a known agent", "A4") if not isinstance(amount, (int, float)) or isinstance(amount, bool) or amount <= 0: return _deny("spend amount must be a positive number", "A4") cap_fraction = float(kernel.get("treasury.spend_cap_fraction", 1.0)) cap = cap_fraction * state.treasury if amount > cap + EPS: return _deny( f"spend {amount} exceeds per-proposal cap {cap:.2f} " f"({cap_fraction:.0%} of treasury)", "A4", ) agg = kernel.get("treasury.aggregate_window") if agg: window = int(agg.get("window_turns", state.epoch_length)) since = max(0, state.turn - window + 1) open_spends = [p for p in state.open_proposals() if p.kind == "spend"] per_b = agg.get("per_beneficiary_fraction") if per_b is not None: committed = state.executed_spend_to(to, since) + sum( float(p.payload.get("amount", 0.0)) for p in open_spends if p.payload.get("to") == to ) limit = float(per_b) * state.initial_treasury if committed + amount > limit + EPS: return _deny( f"aggregate outflow to {to} would reach " f"{committed + amount:.2f}, over the per-beneficiary " f"window cap {limit:.2f}", "A4", ) total_f = agg.get("total_fraction") if total_f is not None: committed = state.executed_spend_total(since) + sum( float(p.payload.get("amount", 0.0)) for p in open_spends ) limit = float(total_f) * state.initial_treasury if committed + amount > limit + EPS: return _deny( f"aggregate treasury outflow would reach " f"{committed + amount:.2f}, over the window cap {limit:.2f}", "A4", ) return _allow(article="A4") _SCALARS = (bool, int, float, str, type(None)) def _valid_param_value(value: Any, depth: int = 0) -> bool: if depth > 4: return False if isinstance(value, _SCALARS): return True if isinstance(value, (list, tuple)): return all(_valid_param_value(v, depth + 1) for v in value) if isinstance(value, Mapping): return all( isinstance(k, str) and _valid_param_value(v, depth + 1) for k, v in value.items() ) return False def _check_amend_payload(state: SimState, kernel: Kernel, action: Propose) -> Ruling: changes = action.payload.get("changes") if not isinstance(changes, Mapping) or not changes: return _deny("amendment must carry a non-empty 'changes' mapping", "A5") for path, value in changes.items(): if not isinstance(path, str) or not path: return _deny("amendment paths must be non-empty strings", "A5") if not _valid_param_value(value): return _deny(f"amendment value for {path!r} is not plain data", "A5") violation = kernel.floor_violation(path, value) if violation: return _deny(f"amendment violates a constitutional floor: {violation}", "A5") if ( state.emergency.active and bool(kernel.get("emergency.amendment_lockout", False)) and kernel.classify_changes(changes) == "kernel" ): return _deny( "kernel-class parameters may not be amended during an emergency", "A8", ) return _allow(article="A5") def _check_expel_payload(state: SimState, kernel: Kernel, action: Propose) -> Ruling: target = action.payload.get("target") if target == action.actor: return _deny("a citizen may not propose their own expulsion", "A7") if not state.is_citizen(str(target)): return _deny(f"expulsion target {target!r} is not on the roll", "A7") cap = kernel.get("expulsion.per_epoch_cap") if cap is not None: executed = state.expulsions_in_epoch(state.epoch) pending = sum(1 for p in state.open_proposals() if p.kind == "expel") if executed + pending >= int(cap): return _deny( f"expulsion cap for this epoch reached ({cap})", "A7" ) return _allow(article="A7") # ---------------------------------------------------------------- votes def _check_vote(state: SimState, kernel: Kernel, action: Vote) -> Ruling: prop = state.proposals.get(action.proposal_id) if prop is None: return _deny(f"unknown proposal {action.proposal_id!r}", "A3") if prop.status != "open" or state.turn > prop.closes_turn: return _deny(f"proposal {prop.proposal_id} is not open for voting", "A3") if action.choice not in VOTE_CHOICES: return _deny(f"unknown ballot choice {action.choice!r}", "A3") rec = state.agents[action.actor] if rec.delegate_to is not None: delegate = state.agents.get(rec.delegate_to) if delegate is not None and delegate.citizen: return _deny( "your ballot is delegated; revoke before voting directly", "A6" ) if prop.kind == "expel" and prop.payload.get("target") == action.actor: if not bool(kernel.get("expulsion.target_may_vote", False)): return _deny( "the named citizen does not vote on their own expulsion", "A7" ) return _allow(article="A3") # ----------------------------------------------------------- delegation def _check_delegate(state: SimState, kernel: Kernel, action: Delegate) -> Ruling: if not bool(kernel.get("delegation.enabled", False)): return _deny("delegation is not enabled under this kernel", "A6") if action.to == action.actor: return _deny("a citizen may not delegate to themself", "A6") if not state.is_citizen(action.to): return _deny(f"delegate {action.to!r} is not on the roll", "A6") rec = state.agents[action.actor] if rec.delegate_to is not None: return _deny("a delegation already stands; revoke it first", "A6") max_inbound = kernel.get("delegation.max_inbound") if max_inbound is not None and state.inbound_delegations(action.to) >= int(max_inbound): return _deny( f"{action.to} already holds the maximum {max_inbound} inbound " "delegations", "A6", ) return _allow(article="A6") def _check_revoke(state: SimState, kernel: Kernel, action: RevokeDelegation) -> Ruling: rec = state.agents[action.actor] if rec.delegate_to is None: return _deny("no delegation stands to revoke", "A6") mode = str(kernel.get("delegation.revocation", "any_turn")) if mode == "epoch_end" and not state.is_epoch_end: return _deny( "delegations are revocable only at the end of an epoch under " "this kernel", "A6", ) return _allow(article="A6") # ------------------------------------------------------------ transfers def _check_transfer(state: SimState, action: Transfer) -> Ruling: rec = state.agents[action.actor] if action.to == action.actor: return _deny("cannot transfer to yourself", "A4") if action.to not in state.agents: return _deny(f"unknown transfer recipient {action.to!r}", "A4") if not isinstance(action.amount, (int, float)) or isinstance(action.amount, bool) or action.amount <= 0: return _deny("transfer amount must be a positive number", "A4") if rec.balance + EPS < action.amount: return _deny( f"insufficient private balance ({rec.balance:.2f} < {action.amount:.2f})", "A4", ) return _allow(article="A4")