"""Command-line interface for the FablePool adversarial self-play framework. Subcommands ----------- run Run a tournament of self-play episodes under a kernel. replay Replay episode logs and verify deterministic reproduction. exploit-to-test Convert exploit JSON records into permanent regression tests. validate-kernel Load and sanity-check a kernel YAML file. report Render a markdown summary from a tournament result JSON. All randomness is seed-driven; given the same kernel, roster, and seed, a tournament is bit-for-bit reproducible, which is what makes the exploit -> regression-test pipeline trustworthy. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any from fable_selfplay.agents import AGENT_REGISTRY from fable_selfplay.detectors import ExploitRecord from fable_selfplay.exploit_to_test import generate_regression_test from fable_selfplay.kernel import load_kernel from fable_selfplay.replay import replay_episode from fable_selfplay.tournament import TournamentConfig, run_tournament # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def parse_roster(spec: str) -> dict[str, int]: """Parse a roster spec like ``honest:8,drainer:2`` into ``{role: count}``. Raises ``ValueError`` for unknown roles, bad counts, or an empty roster. """ roster: dict[str, int] = {} for part in spec.split(","): part = part.strip() if not part: continue if ":" not in part: raise ValueError(f"bad roster entry {part!r}; expected role:count") role, _, count_s = part.partition(":") role = role.strip() if role not in AGENT_REGISTRY: known = ", ".join(sorted(AGENT_REGISTRY)) raise ValueError(f"unknown role {role!r}; known roles: {known}") try: count = int(count_s) except ValueError as exc: raise ValueError(f"bad count for role {role!r}: {count_s!r}") from exc if count < 0: raise ValueError(f"negative count for role {role!r}") roster[role] = roster.get(role, 0) + count if not roster or sum(roster.values()) == 0: raise ValueError(f"roster spec {spec!r} produced an empty population") return roster def _load_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as fh: return json.load(fh) def _dump_json(obj: Any, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as fh: json.dump(obj, fh, indent=2, sort_keys=True) fh.write("\n") def _fmt_metrics(metrics: dict[str, Any]) -> str: if not metrics: return " (no aggregate metrics reported)" width = max(len(k) for k in metrics) lines = [] for key in sorted(metrics): value = metrics[key] if isinstance(value, float): lines.append(f" {key.ljust(width)} {value:.4f}") else: lines.append(f" {key.ljust(width)} {value}") return "\n".join(lines) # --------------------------------------------------------------------------- # subcommand: run # --------------------------------------------------------------------------- def cmd_run(args: argparse.Namespace) -> int: try: roster = parse_roster(args.roster) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 2 kernel_path = Path(args.kernel) if not kernel_path.exists(): print(f"error: kernel file not found: {kernel_path}", file=sys.stderr) return 2 config = TournamentConfig( name=args.name, kernel_path=str(kernel_path), episodes=args.episodes, seed=args.seed, roster=roster, max_turns=args.max_turns, ) print(f"tournament {args.name!r}: kernel={kernel_path} episodes={args.episodes} " f"seed={args.seed} roster={roster} max_turns={args.max_turns}") result = run_tournament(config) result_dict = result.to_dict() exploits = list(getattr(result, "exploits", []) or []) metrics = result_dict.get("metrics", {}) print(f"\ncompleted {len(result_dict.get('episodes', []))} episodes; " f"{len(exploits)} exploit(s) detected") print(_fmt_metrics(metrics)) if args.out: out_path = Path(args.out) _dump_json(result_dict, out_path) print(f"\nresult written to {out_path}") if exploits and args.exploit_dir: exploit_dir = Path(args.exploit_dir) exploit_dir.mkdir(parents=True, exist_ok=True) for exploit in exploits: record = exploit.to_dict() if hasattr(exploit, "to_dict") else dict(exploit) exploit_id = record.get("exploit_id", "EXP-UNKNOWN") path = exploit_dir / f"{exploit_id}.json" _dump_json(record, path) print(f"exploit record written: {path}") if exploits and args.fail_on_exploit: return 1 return 0 # --------------------------------------------------------------------------- # subcommand: replay # --------------------------------------------------------------------------- def cmd_replay(args: argparse.Namespace) -> int: kernel = load_kernel(args.kernel) log = _load_json(Path(args.log)) # Accept either a single episode dict or a full tournament result # containing an "episodes" list. if isinstance(log, dict) and "episodes" in log: episodes = log["episodes"] elif isinstance(log, list): episodes = log else: episodes = [log] diverged = 0 for index, episode in enumerate(episodes): outcome = replay_episode(kernel, episode) if getattr(outcome, "diverged", False): diverged += 1 turn = getattr(outcome, "first_divergence_turn", None) where = f" at turn {turn}" if turn is not None else "" print(f"episode {index}: DIVERGED{where}") else: print(f"episode {index}: ok (deterministic reproduction)") print(f"\nreplayed {len(episodes)} episode(s); {diverged} divergence(s)") return 1 if diverged else 0 # --------------------------------------------------------------------------- # subcommand: exploit-to-test # --------------------------------------------------------------------------- def cmd_exploit_to_test(args: argparse.Namespace) -> int: out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) generated: list[Path] = [] for exploit_path in args.exploits: path = Path(exploit_path) if not path.exists(): print(f"error: exploit record not found: {path}", file=sys.stderr) return 2 record = ExploitRecord.from_dict(_load_json(path)) test_path = generate_regression_test(record, out_dir) generated.append(Path(test_path)) print(f"{path} -> {test_path}") print(f"\ngenerated {len(generated)} regression test(s) in {out_dir}") return 0 # --------------------------------------------------------------------------- # subcommand: validate-kernel # --------------------------------------------------------------------------- def cmd_validate_kernel(args: argparse.Namespace) -> int: path = Path(args.kernel) if not path.exists(): print(f"error: kernel file not found: {path}", file=sys.stderr) return 2 try: kernel = load_kernel(path) except Exception as exc: # surfaced to the operator, not swallowed print(f"INVALID: {path}: {exc}", file=sys.stderr) return 1 version = getattr(kernel, "version", "?") articles = getattr(kernel, "articles", {}) or {} parameters = getattr(kernel, "parameters", {}) or {} invariants = getattr(kernel, "invariants", []) or [] print(f"kernel: {path}") print(f"version: {version}") print(f"articles: {len(articles)}") print(f"parameters: {len(parameters)}") print(f"invariants: {len(invariants)}") print("status: VALID") return 0 # --------------------------------------------------------------------------- # subcommand: report # --------------------------------------------------------------------------- def cmd_report(args: argparse.Namespace) -> int: result = _load_json(Path(args.result)) name = result.get("name", "unnamed tournament") kernel_version = result.get("kernel_version", "?") seed = result.get("seed", "?") episodes = result.get("episodes", []) metrics = result.get("metrics", {}) exploits = result.get("exploits", []) roster = result.get("roster", {}) lines: list[str] = [] lines.append(f"# Tournament report: {name}") lines.append("") lines.append("| field | value |") lines.append("|---|---|") lines.append(f"| kernel version | `{kernel_version}` |") lines.append(f"| seed | `{seed}` |") lines.append(f"| episodes | {len(episodes)} |") lines.append(f"| roster | `{json.dumps(roster, sort_keys=True)}` |") lines.append(f"| exploits detected | {len(exploits)} |") lines.append("") lines.append("## Aggregate metrics") lines.append("") lines.append("| metric | value |") lines.append("|---|---|") for key in sorted(metrics): value = metrics[key] rendered = f"{value:.4f}" if isinstance(value, float) else str(value) lines.append(f"| {key} | {rendered} |") lines.append("") if exploits: lines.append("## Exploits") lines.append("") for exploit in exploits: exploit_id = exploit.get("exploit_id", "EXP-UNKNOWN") title = exploit.get("name", "") detector = exploit.get("detector", "?") lines.append(f"### {exploit_id}: {title}") lines.append("") lines.append(f"- detector: `{detector}`") if "episode_seed" in exploit: lines.append(f"- episode seed: `{exploit['episode_seed']}`") if "severity" in exploit: lines.append(f"- severity: {exploit['severity']}") if "description" in exploit: lines.append("") lines.append(exploit["description"]) lines.append("") text = "\n".join(lines) + "\n" if args.out: out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(text, encoding="utf-8") print(f"report written to {out_path}") else: print(text) return 0 # --------------------------------------------------------------------------- # parser / entry point # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="fable-selfplay", description="Adversarial self-play for the FablePool constitution.", ) sub = parser.add_subparsers(dest="command", required=True) p_run = sub.add_parser("run", help="run a self-play tournament") p_run.add_argument("--kernel", required=True, help="path to kernel YAML") p_run.add_argument("--episodes", type=int, default=100) p_run.add_argument("--seed", type=int, default=0) p_run.add_argument("--roster", default="honest:8,drainer:2", help="comma-separated role:count pairs") p_run.add_argument("--max-turns", type=int, default=120) p_run.add_argument("--name", default="adhoc") p_run.add_argument("--out", default=None, help="write result JSON here") p_run.add_argument("--exploit-dir", default=None, help="write exploit records into this directory") p_run.add_argument("--fail-on-exploit", action="store_true", help="exit nonzero if any exploit is detected (for CI)") p_run.set_defaults(func=cmd_run) p_replay = sub.add_parser("replay", help="replay episode logs deterministically") p_replay.add_argument("--log", required=True, help="episode or tournament JSON") p_replay.add_argument("--kernel", required=True, help="path to kernel YAML") p_replay.set_defaults(func=cmd_replay) p_e2t = sub.add_parser("exploit-to-test", help="convert exploit records into regression tests") p_e2t.add_argument("exploits", nargs="+", help="exploit JSON file(s)") p_e2t.add_argument("--out-dir", default="tests/regression") p_e2t.set_defaults(func=cmd_exploit_to_test) p_val = sub.add_parser("validate-kernel", help="load and sanity-check a kernel") p_val.add_argument("--kernel", required=True) p_val.set_defaults(func=cmd_validate_kernel) p_rep = sub.add_parser("report", help="render markdown from a result JSON") p_rep.add_argument("--result", required=True) p_rep.add_argument("--out", default=None) p_rep.set_defaults(func=cmd_report) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": raise SystemExit(main())