"""Amendment evaluation harness. The harness is the bridge between the scenario corpus and the amendment pipeline (milestone #2). Given a corpus of adversarial scenarios and two parameter sets — the constitution as ratified (baseline) and the constitution as proposed (the amendment under review) — it runs every scenario against both and renders a verdict: * a scenario that passes under baseline but fails under the proposal is a REGRESSION: the amendment opens an attack the current text repels; * in strict mode (the default, and what CI uses) ANY failing scenario under the proposal blocks the PR; * the parameter diff is classified for semver impact: kernel-protected keys force a major version, which the amendment pipeline maps to a supermajority ratification gate. The harness never mutates anything. It is a pure function from (corpus, baseline params, proposed params) to a verdict, so the same invocation in CI, locally, and in the public ledger always agrees. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple from .engine import ScenarioResult, run_scenario from .model import Scenario, load_corpus from .params import Parameters, load_parameters from .taxonomy import AttackFamily _MISSING = object() #: Parameter namespaces that belong to the constitutional kernel. Changing #: any key under these prefixes is a breaking change to the meta-rules and is #: classified as a MAJOR semver impact, which the amendment pipeline maps to #: a supermajority ratification threshold. KERNEL_KEY_PREFIXES: Tuple[str, ...] = ( "kernel.", "vote.", "amendment.", "membership.", "emergency.", "fork.", ) def _family_value(family: Any) -> str: """Return the string slug for a family that may be an enum or a str.""" return getattr(family, "value", family) def _flatten(data: Dict[str, Any], prefix: str = "") -> Dict[str, Any]: out: Dict[str, Any] = {} for key, value in data.items(): dotted = f"{prefix}.{key}" if prefix else str(key) if isinstance(value, dict): out.update(_flatten(value, dotted)) else: out[dotted] = value return out def diff_parameters( baseline: Parameters, proposed: Parameters ) -> Dict[str, Tuple[Any, Any]]: """Compute a dotted-key diff between two parameter sets. Returns a mapping ``dotted.key -> (old, new)`` where ``None`` on either side means the key is absent in that version. """ before = _flatten(baseline.as_dict()) after = _flatten(proposed.as_dict()) diff: Dict[str, Tuple[Any, Any]] = {} for key in sorted(set(before) | set(after)): old = before.get(key, _MISSING) new = after.get(key, _MISSING) if old != new: diff[key] = ( None if old is _MISSING else old, None if new is _MISSING else new, ) return diff def classify_semver(diff: Dict[str, Tuple[Any, Any]]) -> str: """Classify a parameter diff as ``major``, ``minor``, or ``patch``. * ``major`` — at least one kernel-protected key changed (meta-rules: how votes work, how amendments ratify, who is a member, emergency powers, the right to fork). * ``minor`` — only userland parameters changed. * ``patch`` — no parameter changed at all (text-only edit). """ if not diff: return "patch" for key in diff: if any(key.startswith(prefix) for prefix in KERNEL_KEY_PREFIXES): return "major" return "minor" def result_to_dict(result: ScenarioResult) -> Dict[str, Any]: """Serialize a single scenario result for JSON reports.""" scenario = result.scenario empathy = result.empathy return { "id": scenario.id, "title": scenario.title, "family": _family_value(scenario.family), "passed": bool(result.passed), "attack_succeeded": bool(result.attack_succeeded), "expected_attack_succeeds": bool(scenario.expected.attack_succeeds), "blocked_by": list(result.blocked_by), "failures": list(result.failures), "empathy": ( { "worst_off_actor": empathy.worst_off_actor, "worst_off_welfare": empathy.worst_off_welfare, "floor": empathy.floor, "passed_floor": bool(empathy.passed_floor), } if empathy is not None else None ), "trace": list(result.trace), } @dataclass class HarnessReport: """Aggregated outcome of running a corpus against one parameter set.""" results: List[ScenarioResult] = field(default_factory=list) @property def total(self) -> int: return len(self.results) @property def passed_count(self) -> int: return sum(1 for r in self.results if r.passed) @property def failed_results(self) -> List[ScenarioResult]: return [r for r in self.results if not r.passed] @property def ok(self) -> bool: return not self.failed_results def failed_ids(self) -> List[str]: return sorted(r.scenario.id for r in self.failed_results) def passed_ids(self) -> List[str]: return sorted(r.scenario.id for r in self.results if r.passed) def by_family(self) -> Dict[str, Tuple[int, int]]: """Return ``family -> (passed, total)`` ordered by family slug.""" counts: Dict[str, List[int]] = {} for result in self.results: slug = _family_value(result.scenario.family) bucket = counts.setdefault(slug, [0, 0]) bucket[1] += 1 if result.passed: bucket[0] += 1 return {slug: (p, t) for slug, (p, t) in sorted(counts.items())} def min_empathy(self) -> Optional[Tuple[str, float]]: """The lowest worst-off welfare seen anywhere in the run. This is the headline empathy number for a parameter set: across every stress scenario, how badly did the worst-off participant ever fare? Returns ``(scenario_id, welfare)`` or ``None`` if no scenario produced an empathy report. """ lowest: Optional[Tuple[str, float]] = None for result in self.results: if result.empathy is None: continue welfare = result.empathy.worst_off_welfare if lowest is None or welfare < lowest[1]: lowest = (result.scenario.id, welfare) return lowest def to_dict(self) -> Dict[str, Any]: floor = self.min_empathy() return { "total": self.total, "passed": self.passed_count, "failed": self.total - self.passed_count, "ok": self.ok, "by_family": { slug: {"passed": p, "total": t} for slug, (p, t) in self.by_family().items() }, "min_empathy": ( {"scenario": floor[0], "worst_off_welfare": floor[1]} if floor is not None else None ), "results": [result_to_dict(r) for r in self.results], } def run_suite( scenarios: Sequence[Scenario], params: Parameters, families: Optional[Iterable[str]] = None, fail_fast: bool = False, ) -> HarnessReport: """Run a corpus of scenarios against a single parameter set.""" wanted = set(families) if families is not None else None report = HarnessReport() for scenario in sorted(scenarios, key=lambda s: s.id): if wanted is not None and _family_value(scenario.family) not in wanted: continue result = run_scenario(scenario, params) report.results.append(result) if fail_fast and not result.passed: break return report @dataclass class AmendmentVerdict: """The harness verdict on a proposed amendment.""" baseline_report: HarnessReport proposed_report: HarnessReport parameter_diff: Dict[str, Tuple[Any, Any]] semver_impact: str strict: bool = True @property def regressions(self) -> List[str]: """Scenarios defended under baseline but exploitable under the proposal.""" baseline_ok = set(self.baseline_report.passed_ids()) return sorted( sid for sid in self.proposed_report.failed_ids() if sid in baseline_ok ) @property def fixes(self) -> List[str]: """Scenarios exploitable under baseline but defended under the proposal.""" baseline_failed = set(self.baseline_report.failed_ids()) return sorted( sid for sid in self.proposed_report.passed_ids() if sid in baseline_failed ) @property def persistent_failures(self) -> List[str]: """Scenarios failing under both versions (pre-existing vulnerabilities).""" baseline_failed = set(self.baseline_report.failed_ids()) return sorted( sid for sid in self.proposed_report.failed_ids() if sid in baseline_failed ) @property def blocked(self) -> bool: if self.strict: return not self.proposed_report.ok return bool(self.regressions) @property def reasons(self) -> List[str]: reasons: List[str] = [] if self.regressions: reasons.append( "amendment introduces %d regression(s): %s" % (len(self.regressions), ", ".join(self.regressions)) ) if self.strict and self.persistent_failures: reasons.append( "%d scenario(s) remain exploitable under the proposed text: %s" % (len(self.persistent_failures), ", ".join(self.persistent_failures)) ) return reasons def to_dict(self) -> Dict[str, Any]: return { "blocked": self.blocked, "strict": self.strict, "reasons": self.reasons, "semver_impact": self.semver_impact, "parameter_diff": { key: {"old": old, "new": new} for key, (old, new) in self.parameter_diff.items() }, "regressions": self.regressions, "fixes": self.fixes, "persistent_failures": self.persistent_failures, "baseline": self.baseline_report.to_dict(), "proposed": self.proposed_report.to_dict(), } def evaluate_amendment( scenarios: Sequence[Scenario], baseline_params: Parameters, proposed_params: Parameters, strict: bool = True, ) -> AmendmentVerdict: """Run the full corpus against both versions and render a verdict.""" baseline_report = run_suite(scenarios, baseline_params) proposed_report = run_suite(scenarios, proposed_params) diff = diff_parameters(baseline_params, proposed_params) return AmendmentVerdict( baseline_report=baseline_report, proposed_report=proposed_report, parameter_diff=diff, semver_impact=classify_semver(diff), strict=strict, ) def evaluate_amendment_paths( corpus_dir: Path, baseline_path: Path, proposed_path: Path, strict: bool = True, ) -> AmendmentVerdict: """Convenience wrapper used by the CLI and CI.""" scenarios = load_corpus(corpus_dir) baseline = load_parameters(baseline_path) proposed = load_parameters(proposed_path) return evaluate_amendment(scenarios, baseline, proposed, strict=strict) # --------------------------------------------------------------------------- # Rendering # --------------------------------------------------------------------------- def render_text(report: HarnessReport) -> str: """Human-readable summary of a suite run.""" lines: List[str] = [] lines.append("FablePool constitutional test suite") lines.append( " scenarios: %d passed: %d failed: %d" % (report.total, report.passed_count, report.total - report.passed_count) ) lines.append(" by family:") for slug, (passed, total) in report.by_family().items(): marker = "ok " if passed == total else "FAIL" lines.append(" %-28s %3d/%-3d %s" % (slug, passed, total, marker)) floor = report.min_empathy() if floor is not None: lines.append( " empathy floor: lowest worst-off welfare %.3f (scenario %s)" % (floor[1], floor[0]) ) if report.failed_results: lines.append("") lines.append("FAILURES:") for result in report.failed_results: lines.append( " %s [%s] %s" % ( result.scenario.id, _family_value(result.scenario.family), result.scenario.title, ) ) for failure in result.failures: lines.append(" - %s" % failure) if result.empathy is not None and not result.empathy.passed_floor: lines.append( " - empathy floor violated: %s at %.3f (floor %.3f)" % ( result.empathy.worst_off_actor, result.empathy.worst_off_welfare, result.empathy.floor, ) ) return "\n".join(lines) def render_amendment_text(verdict: AmendmentVerdict) -> str: """Human-readable verdict on a proposed amendment.""" lines: List[str] = [] lines.append("FablePool amendment gate") lines.append(" semver impact: %s" % verdict.semver_impact.upper()) if verdict.parameter_diff: lines.append(" parameter changes:") for key, (old, new) in verdict.parameter_diff.items(): lines.append(" %s: %r -> %r" % (key, old, new)) else: lines.append(" parameter changes: none (text-only)") lines.append( " baseline: %d/%d scenarios defended" % (verdict.baseline_report.passed_count, verdict.baseline_report.total) ) lines.append( " proposed: %d/%d scenarios defended" % (verdict.proposed_report.passed_count, verdict.proposed_report.total) ) if verdict.regressions: lines.append(" REGRESSIONS (newly exploitable under this amendment):") for sid in verdict.regressions: lines.append(" - %s" % sid) if verdict.fixes: lines.append(" fixes (defended for the first time):") for sid in verdict.fixes: lines.append(" - %s" % sid) if verdict.persistent_failures: lines.append(" persistent failures (exploitable under both versions):") for sid in verdict.persistent_failures: lines.append(" - %s" % sid) lines.append("") if verdict.blocked: lines.append("VERDICT: BLOCKED") for reason in verdict.reasons: lines.append(" %s" % reason) else: lines.append("VERDICT: PASSES THE GATE") return "\n".join(lines) def render_amendment_markdown(verdict: AmendmentVerdict) -> str: """Markdown verdict, written to the GitHub Actions step summary.""" lines: List[str] = [] status = ":no_entry: **BLOCKED**" if verdict.blocked else ":white_check_mark: **PASSES**" lines.append("## Constitutional test suite — amendment gate") lines.append("") lines.append("| | |") lines.append("|---|---|") lines.append("| Verdict | %s |" % status) lines.append("| Semver impact | `%s` |" % verdict.semver_impact) lines.append( "| Baseline | %d/%d defended |" % (verdict.baseline_report.passed_count, verdict.baseline_report.total) ) lines.append( "| Proposed | %d/%d defended |" % (verdict.proposed_report.passed_count, verdict.proposed_report.total) ) floor = verdict.proposed_report.min_empathy() if floor is not None: lines.append( "| Empathy floor (proposed) | %.3f in `%s` |" % (floor[1], floor[0]) ) lines.append("") if verdict.parameter_diff: lines.append("### Parameter changes") lines.append("") lines.append("| key | old | new |") lines.append("|---|---|---|") for key, (old, new) in verdict.parameter_diff.items(): lines.append("| `%s` | `%r` | `%r` |" % (key, old, new)) lines.append("") if verdict.regressions: lines.append("### :rotating_light: Regressions") lines.append("") lines.append( "These attacks are repelled by the current constitution but " "**succeed under the proposed amendment**:" ) lines.append("") for sid in verdict.regressions: lines.append("- `%s`" % sid) lines.append("") if verdict.fixes: lines.append("### Fixes") lines.append("") for sid in verdict.fixes: lines.append("- `%s`" % sid) lines.append("") if verdict.persistent_failures: lines.append("### Pre-existing vulnerabilities (both versions)") lines.append("") for sid in verdict.persistent_failures: lines.append("- `%s`" % sid) lines.append("") return "\n".join(lines) def write_json_report(payload: Dict[str, Any], path: Path) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")