"""The exploit-to-test pipeline. Every successful exploit found by the tournament harness becomes two artifacts, both permanent: 1. an **exploit record** — a JSON file under ``exploits/`` capturing the exact citizen roll, kernel parameters, action trace, and a declarative success predicate under which the exploit was observed; and 2. a **regression test** — a pytest module under ``tests/regression/`` generated from that record, which asserts three things: * the exploit still *reproduces* under its discovery kernel (the archive is intact), * the exploit is *blocked* under the current patched kernel (the patch works), and * exploit-specific patched invariants hold (the patch does what its rationale claims, not merely something). The generated files are deterministic functions of the records, so ``write_regression_tests(check=True)`` doubles as a CI drift check: if a record and its generated test ever disagree, the build fails. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Optional, Union from fable_selfplay import replay __all__ = [ "ExploitRecord", "load_exploits", "next_exploit_id", "render_test", "verify_all", "verify_record", "write_regression_tests", ] EXPLOIT_FILE_GLOB = "EXP-*.json" _REQUIRED_FIELDS = ( "exploit_id", "title", "summary", "mechanism", "severity", "capture_objective", "discovered_by", "tournament", "citizens", "initial_treasury", "kernel_params_at_discovery", "action_trace", "success_predicate", ) @dataclass class ExploitRecord: """A permanent, auditable record of one discovered exploit.""" exploit_id: str title: str summary: str mechanism: str severity: str capture_objective: str discovered_by: str tournament: dict citizens: list initial_treasury: float kernel_params_at_discovery: dict action_trace: list success_predicate: dict patched_assertions: list = field(default_factory=list) proposed_patch: dict = field(default_factory=dict) status: str = "open" @classmethod def from_dict(cls, data: dict) -> "ExploitRecord": missing = [key for key in _REQUIRED_FIELDS if key not in data] if missing: raise ValueError( f"exploit record missing required fields: {', '.join(missing)}" ) return cls( exploit_id=data["exploit_id"], title=data["title"], summary=data["summary"], mechanism=data["mechanism"], severity=data["severity"], capture_objective=data["capture_objective"], discovered_by=data["discovered_by"], tournament=data["tournament"], citizens=data["citizens"], initial_treasury=float(data["initial_treasury"]), kernel_params_at_discovery=data["kernel_params_at_discovery"], action_trace=data["action_trace"], success_predicate=data["success_predicate"], patched_assertions=data.get("patched_assertions", []), proposed_patch=data.get("proposed_patch", {}), status=data.get("status", "open"), ) @classmethod def from_path(cls, path: Union[str, Path]) -> "ExploitRecord": return cls.from_dict(json.loads(Path(path).read_text(encoding="utf-8"))) def to_dict(self) -> dict: return { "exploit_id": self.exploit_id, "title": self.title, "summary": self.summary, "mechanism": self.mechanism, "severity": self.severity, "capture_objective": self.capture_objective, "discovered_by": self.discovered_by, "tournament": self.tournament, "status": self.status, "initial_treasury": self.initial_treasury, "citizens": self.citizens, "kernel_params_at_discovery": self.kernel_params_at_discovery, "action_trace": self.action_trace, "success_predicate": self.success_predicate, "patched_assertions": self.patched_assertions, "proposed_patch": self.proposed_patch, } def write(self, directory: Union[str, Path]) -> Path: path = Path(directory) / f"{self.exploit_id}.json" path.write_text( json.dumps(self.to_dict(), indent=2) + "\n", encoding="utf-8" ) return path @property def safe_id(self) -> str: return self.exploit_id.replace("-", "_").lower() def load_exploits(directory: Union[str, Path]) -> list: """Load every exploit record in a directory, sorted by id.""" paths = sorted(Path(directory).glob(EXPLOIT_FILE_GLOB)) return [ExploitRecord.from_path(path) for path in paths] def next_exploit_id(directory: Union[str, Path]) -> str: """Mint the next sequential exploit id (EXP-007, EXP-008, ...).""" highest = 0 for path in Path(directory).glob(EXPLOIT_FILE_GLOB): stem = path.stem # e.g. "EXP-004" try: highest = max(highest, int(stem.split("-", 1)[1])) except (IndexError, ValueError): continue return f"EXP-{highest + 1:03d}" # ---------------------------------------------------------------------- # # test generation # # ---------------------------------------------------------------------- # def render_test(record: ExploitRecord) -> str: """Render the regression-test module for one exploit record. The output is a deterministic function of the record, which makes drift between archive and suite detectable. """ return f'''"""Regression tests for {record.exploit_id}: {record.title}. {record.summary} Severity: {record.severity} | Capture objective: {record.capture_objective} | Found by detector: {record.discovered_by} GENERATED by fable_selfplay.exploit_to_test. Do not edit by hand. Regenerate with: fable-selfplay exploit-to-test --exploits exploits --out tests/regression """ from pathlib import Path import pytest from fable_selfplay import replay from fable_selfplay.exploit_to_test import ExploitRecord _ROOT = Path(__file__).resolve().parents[2] RECORD_PATH = _ROOT / "exploits" / "{record.exploit_id}.json" KERNEL_V02_PATH = _ROOT / "kernel" / "kernel-v0.2.yaml" @pytest.fixture(scope="module") def record() -> ExploitRecord: return ExploitRecord.from_path(RECORD_PATH) def _run(record: ExploitRecord, params: dict) -> "replay.ReplayResult": return replay.run_trace( params, record.citizens, record.initial_treasury, record.action_trace, ) def test_{record.safe_id}_reproduces_under_discovery_kernel(record: ExploitRecord) -> None: """The exploit must still succeed under the kernel it was discovered against. If this fails, the archived record no longer reproduces and the exploit archive itself is corrupt; investigate before touching the kernel. """ result = _run(record, record.kernel_params_at_discovery) assert replay.evaluate_predicate(record.success_predicate, result), ( "{record.exploit_id} no longer reproduces against its discovery kernel" ) def test_{record.safe_id}_blocked_under_kernel_v0_2(record: ExploitRecord) -> None: """The patched kernel must defeat the recorded exploit trace.""" params = replay.load_kernel_params(KERNEL_V02_PATH) result = _run(record, params) assert not replay.evaluate_predicate(record.success_predicate, result), ( "{record.exploit_id} succeeds under kernel v0.2: regression" ) def test_{record.safe_id}_patched_invariants_hold(record: ExploitRecord) -> None: """Exploit-specific guarantees the patch is expected to provide.""" params = replay.load_kernel_params(KERNEL_V02_PATH) result = _run(record, params) for assertion in record.patched_assertions: assert replay.evaluate_predicate(assertion, result), ( f"patched invariant violated: {{assertion}}" ) ''' def write_regression_tests( exploits_dir: Union[str, Path], out_dir: Union[str, Path], check: bool = False, ) -> dict: """Compile every exploit record into its regression-test module. Returns a summary dict with ``written``, ``unchanged``, and ``drift`` lists of paths. In ``check`` mode nothing is written; any file that would change is reported under ``drift`` (CI fails on non-empty drift). """ out = Path(out_dir) out.mkdir(parents=True, exist_ok=True) summary = {"written": [], "unchanged": [], "drift": []} for record in load_exploits(exploits_dir): rendered = render_test(record) path = out / f"test_{record.safe_id}.py" existing = ( path.read_text(encoding="utf-8") if path.exists() else None ) if existing == rendered: summary["unchanged"].append(str(path)) elif check: summary["drift"].append(str(path)) else: path.write_text(rendered, encoding="utf-8") summary["written"].append(str(path)) return summary # ---------------------------------------------------------------------- # # verification (used by the CLI and by the tournament minting flow) # # ---------------------------------------------------------------------- # def verify_record( record: ExploitRecord, patched_kernel_path: Optional[Union[str, Path]] = None, ) -> dict: """Re-run a record's trace and report whether it reproduces / is blocked.""" discovery = replay.run_trace( record.kernel_params_at_discovery, record.citizens, record.initial_treasury, record.action_trace, ) report = { "exploit_id": record.exploit_id, "reproduces": replay.evaluate_predicate( record.success_predicate, discovery ), "discovery_final_treasury": discovery.final_treasury, } if patched_kernel_path is not None: params = replay.load_kernel_params(patched_kernel_path) patched = replay.run_trace( params, record.citizens, record.initial_treasury, record.action_trace ) report["blocked"] = not replay.evaluate_predicate( record.success_predicate, patched ) report["patched_invariants_hold"] = all( replay.evaluate_predicate(assertion, patched) for assertion in record.patched_assertions ) report["patched_blocked_actions"] = len(patched.blocked) report["patched_final_treasury"] = patched.final_treasury return report def verify_all( exploits_dir: Union[str, Path], patched_kernel_path: Optional[Union[str, Path]] = None, ) -> list: """Verify every record in the archive; returns one report per record.""" return [ verify_record(record, patched_kernel_path) for record in load_exploits(exploits_dir) ]