#!/usr/bin/env python3 """Verify the exploit-to-test pipeline is airtight. Enforced invariants (always): 1. Every exploits/EXP-*.json parses as JSON and contains its own id string. 2. Every exploit record has a matching regression test in tests/regression/ (EXP-001 -> test_exp_001.py), and that test references the exploit id. 3. Every regression test of the form test_exp_*.py has a matching exploit record (no orphan tests, no orphan records). 4. CHANGELOG.md accounts for every exploit id. 5. Both kernel files parse as YAML, declare versions, and differ in content. Additional strict-mode invariants (--strict, for newly authored records): 6. Records carry the required fields from exploits/SCHEMA.json and their 'id' field matches the filename. Exit code 0 if all invariants hold; 1 otherwise. Used by CI and by the amendment gate. Pure stdlib + PyYAML; no other dependencies. """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path import yaml REPO_ROOT = Path(__file__).resolve().parents[1] EXPLOITS_DIR = REPO_ROOT / "exploits" REGRESSION_DIR = REPO_ROOT / "tests" / "regression" CHANGELOG = REPO_ROOT / "CHANGELOG.md" KERNEL_DIR = REPO_ROOT / "kernel" EXPLOIT_FILE_RE = re.compile(r"^EXP-(\d{3})\.json$") TEST_FILE_RE = re.compile(r"^test_exp_(\d{3})\.py$") STRICT_REQUIRED_FIELDS = ("id", "title", "kernel_version") class Failure: def __init__(self, where: str, message: str) -> None: self.where = where self.message = message def __str__(self) -> str: return f"FAIL [{self.where}] {self.message}" def collect_exploit_records() -> dict[str, Path]: records: dict[str, Path] = {} for path in sorted(EXPLOITS_DIR.glob("EXP-*.json")): m = EXPLOIT_FILE_RE.match(path.name) if m: records[f"EXP-{m.group(1)}"] = path return records def collect_regression_tests() -> dict[str, Path]: tests: dict[str, Path] = {} for path in sorted(REGRESSION_DIR.glob("test_exp_*.py")): m = TEST_FILE_RE.match(path.name) if m: tests[f"EXP-{m.group(1)}"] = path return tests def check_records(records: dict[str, Path], strict: bool) -> list[Failure]: failures: list[Failure] = [] if not records: failures.append(Failure("exploits/", "no exploit records found")) return failures for exp_id, path in records.items(): raw = path.read_text(encoding="utf-8") try: doc = json.loads(raw) except json.JSONDecodeError as e: failures.append(Failure(str(path), f"invalid JSON: {e}")) continue if exp_id not in raw: failures.append( Failure(str(path), f"record does not mention its own id {exp_id}") ) if strict: if not isinstance(doc, dict): failures.append(Failure(str(path), "top level must be an object")) continue for field in STRICT_REQUIRED_FIELDS: if field not in doc: failures.append( Failure(str(path), f"strict: missing required field '{field}'") ) declared = doc.get("id") if declared is not None and declared != exp_id: failures.append( Failure( str(path), f"strict: 'id' field {declared!r} does not match filename id {exp_id}", ) ) return failures def check_test_coverage( records: dict[str, Path], tests: dict[str, Path] ) -> list[Failure]: failures: list[Failure] = [] for exp_id in records: if exp_id not in tests: expected = f"tests/regression/test_exp_{exp_id.split('-')[1]}.py" failures.append( Failure(exp_id, f"no regression test (expected {expected})") ) else: body = tests[exp_id].read_text(encoding="utf-8") if exp_id not in body: failures.append( Failure( str(tests[exp_id]), f"regression test does not reference exploit id {exp_id}", ) ) for exp_id in tests: if exp_id not in records: failures.append( Failure( str(tests[exp_id]), f"orphan regression test: no exploit record for {exp_id}", ) ) return failures def check_changelog(records: dict[str, Path]) -> list[Failure]: failures: list[Failure] = [] if not CHANGELOG.exists(): failures.append(Failure("CHANGELOG.md", "file missing")) return failures body = CHANGELOG.read_text(encoding="utf-8") for exp_id in records: if exp_id not in body: failures.append( Failure( "CHANGELOG.md", f"does not account for {exp_id}; every exploit must map " f"to an amendment or be explicitly marked open", ) ) return failures def check_kernels() -> list[Failure]: failures: list[Failure] = [] kernels = sorted(KERNEL_DIR.glob("kernel-v*.yaml")) if len(kernels) < 2: failures.append( Failure("kernel/", f"expected at least 2 kernel versions, found {len(kernels)}") ) contents: list[str] = [] for path in kernels: raw = path.read_text(encoding="utf-8") contents.append(raw) try: doc = yaml.safe_load(raw) except yaml.YAMLError as e: failures.append(Failure(str(path), f"invalid YAML: {e}")) continue if not isinstance(doc, dict): failures.append(Failure(str(path), "top level must be a mapping")) continue if not str(doc.get("version", "")).strip(): failures.append(Failure(str(path), "missing 'version' field")) if len(contents) >= 2 and len(set(contents)) != len(contents): failures.append( Failure("kernel/", "two kernel version files have identical content") ) return failures def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--strict", action="store_true", help="also enforce schema required-fields on every record", ) args = parser.parse_args(argv) records = collect_exploit_records() tests = collect_regression_tests() failures: list[Failure] = [] failures += check_records(records, strict=args.strict) failures += check_test_coverage(records, tests) failures += check_changelog(records) failures += check_kernels() print(f"exploit records : {len(records)} ({', '.join(sorted(records)) or 'none'})") print(f"regression tests: {len(tests)}") if failures: print() for f in failures: print(f) print(f"\n{len(failures)} invariant violation(s).") return 1 print("\nAll exploit-to-test pipeline invariants hold.") return 0 if __name__ == "__main__": sys.exit(main())