"""The scenario DSL. Scenarios are YAML documents validated against the pydantic models below. A scenario declares: * **actors** — named participants with factions, optional objectives, and optional starting welfare; * **setup** — initial treasury and any standing budgets; * **script** — a sequence of *legal moves* (propose, vote, delegate, exit, challenge, advance_time) played against the constitutional state machine; * **success_condition** — a predicate over final state defining what it means for the *attack* to have succeeded; * **expected** — almost always ``blocked``: the corpus is a set of regression tests asserting the constitution defeats the attack; * **empathy** — the floor on the worst-off participant's welfare, plus the rationale for that floor. A scenario where the attack is technically blocked but the worst-off participant falls below the floor resolves to ``collateral`` and fails a ``blocked`` expectation; * **interpretations** — adversarial readings of ambiguous text, expressed as parameter overrides. The harness replays the scenario under the baseline reading *and* every listed reading; all must match ``expected``. """ from __future__ import annotations import re from pathlib import Path from typing import Any, Literal, Optional, Union import yaml from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from .taxonomy import AttackTaxonomy, TAXONOMY_INFO class ScenarioError(Exception): """A scenario is malformed (authoring error, not a legality rejection).""" PROPOSAL_KINDS = { "spend", "budget", "repeal_budget", "rule_change", "text", "admit", "expel", "emergency_declare", "emergency_extend", "emergency_end", } MOVES = { "propose", "vote", "delegate", "revoke_delegation", "exit", "challenge", "advance_time", } CHOICES = {"yes", "no", "abstain"} GROUNDS = {"invariant", "disparate_impact"} CHECKS = { "treasury_below", "treasury_above", "param", "member_status", "enacted", "not_enacted", "proposal_status", "emergency_active", "emergency_days_total", "welfare_below", "suffrage", "faction_eligible_share", "budget_active", "members_active_count", } OPS = {"eq", "ne", "lt", "le", "gt", "ge"} class _Strict(BaseModel): model_config = ConfigDict(extra="forbid", populate_by_name=True) # --------------------------------------------------------------------------- # Actors # --------------------------------------------------------------------------- class Actor(_Strict): """A concrete participant after expansion.""" id: str faction: str = "unaffiliated" member: bool = True suffrage: bool = True welfare: float = 100.0 joined_day: int = -10000 objective: Optional[str] = None class ActorSpec(_Strict): """An actor declaration; ``count > 1`` expands ``id_prefix`` into a cohort.""" id: Optional[str] = None id_prefix: Optional[str] = None count: int = Field(default=1, ge=1, le=500) faction: str = "unaffiliated" member: bool = True suffrage: bool = True welfare: float = 100.0 joined_day: int = -10000 objective: Optional[str] = None @model_validator(mode="after") def _check_naming(self) -> "ActorSpec": if self.count == 1: if not self.id and not self.id_prefix: raise ValueError("actor needs an 'id' (or 'id_prefix' with count > 1)") else: if not self.id_prefix: raise ValueError("cohort actors (count > 1) need 'id_prefix'") if self.id: raise ValueError("cohort actors must not set 'id'") return self def expand(self) -> list[Actor]: base = self.model_dump(exclude={"id", "id_prefix", "count"}) if self.count == 1: name = self.id or f"{self.id_prefix}_1" return [Actor(id=name, **base)] return [Actor(id=f"{self.id_prefix}_{i}", **base) for i in range(1, self.count + 1)] # --------------------------------------------------------------------------- # Welfare effects attached to measures # --------------------------------------------------------------------------- class Disparate(_Strict): """A facially-neutral measure that concentrates its burden on a faction.""" faction: str multiplier: float = Field(gt=1.0) class Harm(_Strict): """A welfare effect carried by a measure. ``target`` is ``all``, ``faction:``, or a member id. ``amount`` is subtracted from base welfare on enactment for harms (added for benefits). A ``disparate`` block replaces the amount with ``amount * multiplier`` for the named faction — the engine applies the differential and the review clause may void the measure on a disparate-impact challenge. """ target: str = "all" amount: float = Field(ge=0.0) disparate: Optional[Disparate] = None # --------------------------------------------------------------------------- # Setup # --------------------------------------------------------------------------- class BudgetSetup(_Strict): amount_per_cycle: float = Field(gt=0.0) recipient: str purpose: str benefits: list[Harm] = Field(default_factory=list) lapse_harms: list[Harm] = Field(default_factory=list) class Setup(_Strict): treasury: float = Field(default=1000.0, ge=0.0) budgets: list[BudgetSetup] = Field(default_factory=list) # --------------------------------------------------------------------------- # Script # --------------------------------------------------------------------------- class ScriptStep(_Strict): move: str actor: Optional[str] = None actors: Optional[Union[str, list[str]]] = None label: Optional[str] = Field(default=None, alias="as") kind: Optional[str] = None payload: dict[str, Any] = Field(default_factory=dict) proposal: Optional[str] = None choice: Optional[str] = None days: Optional[int] = None delegate: Optional[str] = None grounds: Optional[str] = None note: Optional[str] = None @model_validator(mode="after") def _check_move(self) -> "ScriptStep": m = self.move if m not in MOVES: raise ValueError(f"unknown move {m!r}; expected one of {sorted(MOVES)}") if m == "propose": if not self.actor: raise ValueError("propose requires 'actor'") if self.kind not in PROPOSAL_KINDS: raise ValueError( f"propose requires 'kind' in {sorted(PROPOSAL_KINDS)}, got {self.kind!r}" ) elif m == "vote": if not (self.actor or self.actors): raise ValueError("vote requires 'actor' or 'actors'") if not self.proposal: raise ValueError("vote requires 'proposal'") if self.choice not in CHOICES: raise ValueError(f"vote requires 'choice' in {sorted(CHOICES)}") elif m == "delegate": if not (self.actor and self.delegate): raise ValueError("delegate requires 'actor' and 'delegate'") elif m == "revoke_delegation": if not self.actor: raise ValueError("revoke_delegation requires 'actor'") elif m == "exit": if not self.actor: raise ValueError("exit requires 'actor'") elif m == "challenge": if not (self.actor and self.proposal): raise ValueError("challenge requires 'actor' and 'proposal'") if self.grounds not in GROUNDS: raise ValueError(f"challenge requires 'grounds' in {sorted(GROUNDS)}") elif m == "advance_time": if not self.days or self.days < 1: raise ValueError("advance_time requires 'days' >= 1") return self # --------------------------------------------------------------------------- # Success conditions # --------------------------------------------------------------------------- class Condition(_Strict): """Recursive predicate over final state. Exactly one of all/any/not/check.""" all_: Optional[list["Condition"]] = Field(default=None, alias="all") any_: Optional[list["Condition"]] = Field(default=None, alias="any") not_: Optional["Condition"] = Field(default=None, alias="not") check: Optional[str] = None value: Any = None path: Optional[str] = None target: Optional[str] = None proposal: Optional[str] = None faction: Optional[str] = None purpose: Optional[str] = None op: Optional[str] = None @model_validator(mode="after") def _check_shape(self) -> "Condition": heads = [ self.all_ is not None, self.any_ is not None, self.not_ is not None, self.check is not None, ] if sum(heads) != 1: raise ValueError("condition needs exactly one of: all, any, not, check") if self.check is not None: if self.check not in CHECKS: raise ValueError(f"unknown check {self.check!r}; expected one of {sorted(CHECKS)}") if self.op is not None and self.op not in OPS: raise ValueError(f"unknown op {self.op!r}; expected one of {sorted(OPS)}") needs_value = { "treasury_below", "treasury_above", "param", "member_status", "proposal_status", "emergency_days_total", "welfare_below", "faction_eligible_share", "members_active_count", } if self.check in needs_value and self.value is None: raise ValueError(f"check {self.check!r} requires 'value'") if self.check == "param" and not self.path: raise ValueError("check 'param' requires 'path'") if self.check in {"member_status", "welfare_below", "suffrage"} and not self.target: raise ValueError(f"check {self.check!r} requires 'target'") if self.check in {"enacted", "not_enacted", "proposal_status"} and not self.proposal: raise ValueError(f"check {self.check!r} requires 'proposal'") if self.check == "faction_eligible_share" and not self.faction: raise ValueError("check 'faction_eligible_share' requires 'faction'") if self.check == "budget_active" and not self.purpose: raise ValueError("check 'budget_active' requires 'purpose'") return self Condition.model_rebuild() # --------------------------------------------------------------------------- # Interpretations and empathy # --------------------------------------------------------------------------- class Interpretation(_Strict): """An adversarial reading of ambiguous text, as parameter overrides.""" id: str rationale: str = Field(min_length=20) overrides: dict[str, Any] @field_validator("overrides") @classmethod def _nonempty(cls, v: dict[str, Any]) -> dict[str, Any]: if not v: raise ValueError("interpretation overrides must not be empty") return v VALID_WEIGHT_KEYS = { "treasury_weight", "payout_weight", "expelled_penalty", "emergency_burden_per_day", } class EmpathySpec(_Strict): """The first-class scoring rule: how does the worst-off participant fare?""" floor: float rationale: str = Field(min_length=30) weights: Optional[dict[str, float]] = None @field_validator("weights") @classmethod def _known_weights(cls, v: Optional[dict[str, float]]) -> Optional[dict[str, float]]: if v: unknown = set(v) - VALID_WEIGHT_KEYS if unknown: raise ValueError(f"unknown empathy weight keys: {sorted(unknown)}") return v # --------------------------------------------------------------------------- # Scenario # --------------------------------------------------------------------------- _ID_RE = re.compile(r"^[A-Z]{2}-\d{3}$") class Scenario(_Strict): id: str title: str = Field(min_length=8) taxonomy: AttackTaxonomy severity: Literal["low", "medium", "high", "critical"] precedent: str = Field(min_length=60) description: str = Field(min_length=40) actors: list[ActorSpec] = Field(min_length=1) setup: Setup = Field(default_factory=Setup) interpretations: list[Interpretation] = Field(default_factory=list) script: list[ScriptStep] = Field(min_length=1) success_condition: Condition expected: Literal["blocked", "exploited"] = "blocked" status: Literal["active", "quarantined"] = "active" quarantine_reason: Optional[str] = None empathy: EmpathySpec tags: list[str] = Field(default_factory=list) # -- helpers ------------------------------------------------------------ def expanded_actors(self) -> list[Actor]: out: list[Actor] = [] for spec in self.actors: out.extend(spec.expand()) return out # -- validation --------------------------------------------------------- @model_validator(mode="after") def _validate_scenario(self) -> "Scenario": if not _ID_RE.match(self.id): raise ValueError(f"scenario id {self.id!r} must match XX-NNN") code = TAXONOMY_INFO[self.taxonomy].code if not self.id.startswith(code + "-"): raise ValueError( f"scenario id {self.id!r} must use the {code!r} prefix for taxonomy " f"{self.taxonomy.value!r}" ) if self.status == "quarantined" and not self.quarantine_reason: raise ValueError("quarantined scenarios require 'quarantine_reason'") actors = self.expanded_actors() ids = [a.id for a in actors] if len(ids) != len(set(ids)): dupes = sorted({i for i in ids if ids.count(i) > 1}) raise ValueError(f"duplicate actor ids after expansion: {dupes}") id_set = set(ids) factions = {a.faction for a in actors} def check_selector(sel: Union[str, list[str]], where: str) -> None: if isinstance(sel, list): missing = [s for s in sel if s not in id_set] if missing: raise ValueError(f"{where}: unknown actors {missing}") return if sel in ("all", "members"): return if sel.startswith("faction:"): f = sel.split(":", 1)[1] if f not in factions: raise ValueError(f"{where}: unknown faction {f!r}") return raise ValueError(f"{where}: bad actor selector {sel!r}") labels: set[str] = set() any_proposal = False for i, step in enumerate(self.script): where = f"script[{i}] ({step.move})" if step.actor and step.actor not in id_set: raise ValueError(f"{where}: unknown actor {step.actor!r}") if step.actors is not None: check_selector(step.actors, where) if step.delegate and step.delegate not in id_set: raise ValueError(f"{where}: unknown delegate {step.delegate!r}") if step.proposal is not None: ref = step.proposal if not ref.startswith("$"): raise ValueError(f"{where}: proposal refs start with '$', got {ref!r}") name = ref[1:] if name == "last": if not any_proposal: raise ValueError(f"{where}: '$last' used before any proposal") elif name not in labels: raise ValueError(f"{where}: undefined proposal label {ref!r}") if step.move == "propose": any_proposal = True if step.label: if step.label in labels: raise ValueError(f"{where}: duplicate proposal label {step.label!r}") labels.add(step.label) def check_condition_refs(cond: Condition, where: str) -> None: if cond.all_ is not None: for j, c in enumerate(cond.all_): check_condition_refs(c, f"{where}.all[{j}]") elif cond.any_ is not None: for j, c in enumerate(cond.any_): check_condition_refs(c, f"{where}.any[{j}]") elif cond.not_ is not None: check_condition_refs(cond.not_, f"{where}.not") else: if cond.proposal is not None: name = cond.proposal.lstrip("$") if name != "last" and name not in labels: raise ValueError(f"{where}: undefined proposal label {cond.proposal!r}") if cond.target is not None and cond.target not in id_set: if not (cond.target.startswith("faction:") or cond.target == "all"): raise ValueError(f"{where}: unknown target {cond.target!r}") check_condition_refs(self.success_condition, "success_condition") return self class ScenarioFile(_Strict): schema_id: Literal["fabletest/v1"] = Field(alias="schema") scenarios: list[Scenario] = Field(min_length=1) def load_scenario_file(path: str | Path) -> list[Scenario]: with open(path, "r", encoding="utf-8") as fh: data = yaml.safe_load(fh) if not isinstance(data, dict): raise ScenarioError(f"{path}: expected a mapping with 'schema' and 'scenarios'") try: sf = ScenarioFile.model_validate(data) except Exception as exc: # surface the file path in validation errors raise ScenarioError(f"{path}: {exc}") from exc return sf.scenarios