"""Command-line interface for the constitutional test suite. Subcommands: * ``run`` — run the full corpus against one parameter set. * ``check-amendment`` — run the corpus against baseline and proposed parameters and block (exit 1) on regression. * ``list`` — list scenarios in the corpus. * ``validate`` — structural validation of the corpus (used in CI and required by the runbook before a new scenario is merged). All subcommands support ``--format text|json|github``. In ``github`` mode the CLI emits workflow annotations (``::error ...``) for failures and appends a markdown report to ``$GITHUB_STEP_SUMMARY`` when set. """ from __future__ import annotations import argparse import json import os import re import sys from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from . import harness from .model import Scenario, load_corpus from .params import load_parameters from .taxonomy import AttackFamily FAMILY_SLUGS = [f.value for f in AttackFamily] _ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$") def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="fabletest", description="FablePool constitutional test suite", ) sub = parser.add_subparsers(dest="command", required=True) def add_common(p: argparse.ArgumentParser) -> None: p.add_argument( "--corpus", type=Path, default=Path("scenarios"), help="directory containing scenario YAML files (default: scenarios/)", ) p.add_argument( "--format", choices=["text", "json", "github"], default="text", help="output format (default: text)", ) p.add_argument( "--report-out", type=Path, default=None, help="optional path to write a full JSON report", ) run_p = sub.add_parser("run", help="run the corpus against one parameter set") add_common(run_p) run_p.add_argument( "--params", type=Path, default=Path("constitution/parameters.yaml"), help="parameter set to test (default: constitution/parameters.yaml)", ) run_p.add_argument( "--family", action="append", choices=FAMILY_SLUGS, default=None, help="restrict the run to one or more attack families (repeatable)", ) run_p.add_argument( "--fail-fast", action="store_true", help="stop at the first failing scenario", ) check_p = sub.add_parser( "check-amendment", help="evaluate a proposed amendment against the corpus and block on failure", ) add_common(check_p) check_p.add_argument( "--baseline", type=Path, required=True, help="parameters of the constitution as currently ratified", ) check_p.add_argument( "--proposed", type=Path, default=Path("constitution/parameters.yaml"), help="parameters as proposed by the amendment (default: working tree)", ) check_p.add_argument( "--allow-known-failures", action="store_true", help=( "block only on regressions, not on vulnerabilities that already " "exist under the baseline (strict mode is the default)" ), ) list_p = sub.add_parser("list", help="list scenarios in the corpus") list_p.add_argument("--corpus", type=Path, default=Path("scenarios")) list_p.add_argument( "--family", action="append", choices=FAMILY_SLUGS, default=None ) validate_p = sub.add_parser( "validate", help="structurally validate every scenario in the corpus" ) validate_p.add_argument("--corpus", type=Path, default=Path("scenarios")) return parser # --------------------------------------------------------------------------- # GitHub Actions helpers # --------------------------------------------------------------------------- def _emit_github_annotations(report: harness.HarnessReport) -> None: for result in report.failed_results: scenario = result.scenario detail = "; ".join(result.failures) or "expected outcome mismatch" print( "::error title=Constitutional test failed::%s [%s] %s — %s" % ( scenario.id, harness._family_value(scenario.family), scenario.title, detail, ) ) def _append_step_summary(markdown: str) -> None: summary_path = os.environ.get("GITHUB_STEP_SUMMARY") if not summary_path: return with open(summary_path, "a", encoding="utf-8") as fh: fh.write(markdown) fh.write("\n") # --------------------------------------------------------------------------- # Subcommand implementations # --------------------------------------------------------------------------- def _cmd_run(args: argparse.Namespace) -> int: scenarios = load_corpus(args.corpus) params = load_parameters(args.params) report = harness.run_suite( scenarios, params, families=args.family, fail_fast=args.fail_fast ) if args.report_out is not None: harness.write_json_report(report.to_dict(), args.report_out) if args.format == "json": print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) elif args.format == "github": _emit_github_annotations(report) print(harness.render_text(report)) summary = ["## Constitutional test suite — full run", ""] summary.append( "**%d/%d** scenarios defended." % (report.passed_count, report.total) ) floor = report.min_empathy() if floor is not None: summary.append( "Empathy floor: lowest worst-off welfare **%.3f** in `%s`." % (floor[1], floor[0]) ) if report.failed_results: summary.append("") summary.append("### Failures") for result in report.failed_results: summary.append("- `%s` — %s" % (result.scenario.id, result.scenario.title)) _append_step_summary("\n".join(summary)) else: print(harness.render_text(report)) return 0 if report.ok else 1 def _cmd_check_amendment(args: argparse.Namespace) -> int: verdict = harness.evaluate_amendment_paths( corpus_dir=args.corpus, baseline_path=args.baseline, proposed_path=args.proposed, strict=not args.allow_known_failures, ) if args.report_out is not None: harness.write_json_report(verdict.to_dict(), args.report_out) if args.format == "json": print(json.dumps(verdict.to_dict(), indent=2, sort_keys=True)) elif args.format == "github": _emit_github_annotations(verdict.proposed_report) if verdict.blocked: for reason in verdict.reasons: print("::error title=Amendment blocked::%s" % reason) print(harness.render_amendment_text(verdict)) _append_step_summary(harness.render_amendment_markdown(verdict)) else: print(harness.render_amendment_text(verdict)) return 1 if verdict.blocked else 0 def _cmd_list(args: argparse.Namespace) -> int: scenarios = load_corpus(args.corpus) wanted = set(args.family) if args.family else None rows = [ s for s in sorted(scenarios, key=lambda s: s.id) if wanted is None or harness._family_value(s.family) in wanted ] print("%-40s %-26s %s" % ("ID", "FAMILY", "TITLE")) for s in rows: print("%-40s %-26s %s" % (s.id, harness._family_value(s.family), s.title)) print() print("%d scenario(s)" % len(rows)) return 0 def validate_scenarios(scenarios: Sequence[Scenario]) -> Dict[str, List[str]]: """Structural validation beyond what the loader enforces. Returns ``{"errors": [...], "warnings": [...]}``. Errors block CI; warnings are advisory. The runbook (docs/runbook.md) requires a clean ``validate`` pass before a new scenario can be merged. """ errors: List[str] = [] warnings: List[str] = [] seen_ids: Dict[str, int] = {} for scenario in scenarios: sid = scenario.id seen_ids[sid] = seen_ids.get(sid, 0) + 1 if not _ID_PATTERN.match(sid): errors.append( "%s: id must be lowercase kebab-case ([a-z0-9-])" % sid ) family = harness._family_value(scenario.family) if family not in FAMILY_SLUGS: errors.append("%s: unknown attack family %r" % (sid, family)) if not (scenario.description or "").strip(): errors.append("%s: description is required" % sid) if len((scenario.description or "").strip()) < 40: warnings.append( "%s: description is very short; explain the attack mechanics" % sid ) if not (scenario.precedent or "").strip(): errors.append( "%s: precedent is required — cite the historical or " "game-theoretic source this scenario encodes" % sid ) if len((scenario.actors or [])) < 2: errors.append("%s: at least two actors are required" % sid) if not scenario.moves: errors.append("%s: at least one move is required" % sid) if scenario.expected is None: errors.append("%s: expected outcome is required" % sid) empathy = scenario.empathy if empathy is None: errors.append("%s: empathy section is required" % sid) else: floor = getattr(empathy, "floor", None) if floor is None or not (0.0 <= float(floor) <= 1.0): errors.append("%s: empathy.floor must be in [0, 1]" % sid) if not (getattr(empathy, "rationale", "") or "").strip(): errors.append( "%s: empathy.rationale is required — say who the " "worst-off participant is and why the floor is set there" % sid ) if ( scenario.expected is not None and scenario.expected.attack_succeeds and "known-vulnerability" not in (scenario.tags or []) ): warnings.append( "%s: expects the attack to SUCCEED but is not tagged " "'known-vulnerability'" % sid ) for sid, count in seen_ids.items(): if count > 1: errors.append("duplicate scenario id: %s (%d occurrences)" % (sid, count)) return {"errors": errors, "warnings": warnings} def _cmd_validate(args: argparse.Namespace) -> int: try: scenarios = load_corpus(args.corpus) except Exception as exc: # loader errors are validation failures too print("corpus failed to load: %s" % exc, file=sys.stderr) return 1 outcome = validate_scenarios(scenarios) for warning in outcome["warnings"]: print("WARNING: %s" % warning) for error in outcome["errors"]: print("ERROR: %s" % error, file=sys.stderr) print( "%d scenario(s) checked: %d error(s), %d warning(s)" % (len(scenarios), len(outcome["errors"]), len(outcome["warnings"])) ) return 1 if outcome["errors"] else 0 def main(argv: Optional[Sequence[str]] = None) -> int: parser = _build_parser() args = parser.parse_args(argv) handlers = { "run": _cmd_run, "check-amendment": _cmd_check_amendment, "list": _cmd_list, "validate": _cmd_validate, } return handlers[args.command](args) if __name__ == "__main__": raise SystemExit(main())