"""Cross-artifact consistency audit for the FablePool constitutional test suite. This module is the milestone's final build-hygiene gate. It checks, statically, that the four artifact layers of the test suite agree with each other: 1. **Scenario ``blocked_by`` references -> ``constitution/parameters.yaml``** Every scenario declares which constitutional parameter(s) are responsible for blocking the encoded attack. If a scenario names a parameter that does not exist in the constitution's parameter file, the scenario is asserting a defense the constitution does not actually have. That is an **error**. 2. **Scenario action verbs -> engine action handlers** Every legal move in a scenario sequence must be executable by the engine. The authoritative gate for this is the runtime corpus execution in ``tests/test_corpus.py`` (the engine raises on an unknown verb). This static check is a *fast-fail* companion: it introspects ``fabletest.engine`` for action registries and handler-method naming conventions. If a dict-style registry is discovered, mismatches are **errors**; if only convention-based discovery succeeds, mismatches are **warnings** (deferred to the runtime gate), so that this audit can never produce a false CI failure that the runtime gate would not also produce. 3. **Scenario taxonomy references -> ``fabletest.taxonomy``** Every scenario's attack-class label must be a category the taxonomy actually defines, so the corpus index, docs, and CI summaries stay coherent. 4. **Package imports -> ``pyproject.toml`` manifest** Every third-party import anywhere in ``fabletest/`` or ``tests/`` must be declared in the project manifest (``[project] dependencies``, ``[project.optional-dependencies]``, or PEP 735 ``[dependency-groups]``), so the suite builds on a clean machine. The audit is exposed three ways: * programmatically (``run_audit()`` returning an :class:`AuditReport`), * as pytest assertions (``tests/test_consistency.py``), so it runs inside the existing CI workflow with no workflow changes, and * as a CLI: ``python -m fabletest.audit [--strict] [--root PATH]``. Design note on defensiveness ---------------------------- The engine and taxonomy modules are living code; their internal registry shapes may be refactored as exploits are folded back into the corpus (see ``docs/runbook.md``). The discovery functions here therefore harvest *any* plausible registry shape (str->callable dicts at module or class scope, ``action_*``/``handle_*``/``apply_*`` method conventions, Enum members, string constants) and deliberately over-collect. Over-collection can only weaken a static check, never make it falsely fail — the runtime corpus execution remains the authoritative gate. Every check that cannot establish an authoritative ground truth downgrades itself to a warning and says so, rather than guessing. """ from __future__ import annotations import argparse import ast import enum import importlib import re import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, Iterator, Optional import yaml # --------------------------------------------------------------------------- # Report model # --------------------------------------------------------------------------- SEVERITY_ERROR = "error" SEVERITY_WARNING = "warning" SEVERITY_INFO = "info" _SEVERITY_ORDER = {SEVERITY_ERROR: 0, SEVERITY_WARNING: 1, SEVERITY_INFO: 2} @dataclass(frozen=True) class Finding: """A single audit observation.""" check: str severity: str message: str def render(self) -> str: return f"[{self.severity.upper():7s}] {self.check}: {self.message}" @dataclass class AuditReport: """Aggregated result of all audit checks.""" findings: list[Finding] = field(default_factory=list) @property def errors(self) -> list[Finding]: return [f for f in self.findings if f.severity == SEVERITY_ERROR] @property def warnings(self) -> list[Finding]: return [f for f in self.findings if f.severity == SEVERITY_WARNING] @property def ok(self) -> bool: return not self.errors def ok_strict(self) -> bool: return not self.errors and not self.warnings def format_findings(findings: Iterable[Finding]) -> str: ordered = sorted( findings, key=lambda f: (_SEVERITY_ORDER.get(f.severity, 9), f.check, f.message) ) return "\n".join(f.render() for f in ordered) def format_report(report: AuditReport) -> str: lines = [] if report.findings: lines.append(format_findings(report.findings)) lines.append( f"audit: {len(report.errors)} error(s), {len(report.warnings)} warning(s), " f"{len(report.findings) - len(report.errors) - len(report.warnings)} info" ) lines.append("audit: " + ("PASS" if report.ok else "FAIL")) return "\n".join(lines) # --------------------------------------------------------------------------- # Repository layout discovery # --------------------------------------------------------------------------- def find_repo_root(start: Optional[Path] = None) -> Path: """Walk upward from *start* (default: this file) to the directory holding ``pyproject.toml``.""" here = (start or Path(__file__)).resolve() if here.is_file(): here = here.parent for candidate in [here, *here.parents]: if (candidate / "pyproject.toml").is_file(): return candidate raise FileNotFoundError( f"could not locate pyproject.toml above {here}; pass --root explicitly" ) def scenarios_dir(root: Path) -> Path: return root / "scenarios" def parameters_file(root: Path) -> Path: return root / "constitution" / "parameters.yaml" # --------------------------------------------------------------------------- # YAML traversal helpers # --------------------------------------------------------------------------- def flatten_keys(mapping: Any, prefix: str = "") -> set[str]: """Return every dotted key path in *mapping*, including intermediate (section) paths, so a ``blocked_by`` reference may target either a leaf parameter or a whole parameter section.""" keys: set[str] = set() if not isinstance(mapping, dict): return keys for key, value in mapping.items(): path = f"{prefix}.{key}" if prefix else str(key) keys.add(path) if isinstance(value, dict): keys |= flatten_keys(value, path) return keys def _collect_string_values(node: Any, key_names: frozenset[str], out: set[str]) -> None: """Recursively gather string (or list-of-string) values stored under any of *key_names* anywhere in a YAML structure.""" if isinstance(node, dict): for key, value in node.items(): if key in key_names: if isinstance(value, str): out.add(value) elif isinstance(value, (list, tuple)): out.update(item for item in value if isinstance(item, str)) _collect_string_values(value, key_names, out) elif isinstance(node, (list, tuple)): for item in node: _collect_string_values(item, key_names, out) BLOCKED_BY_KEYS = frozenset({"blocked_by"}) ACTION_KEYS = frozenset({"action", "verb", "move"}) TAXONOMY_KEYS = frozenset({"category", "taxonomy", "attack_class"}) def iter_scenario_documents(root: Path) -> Iterator[tuple[Path, Any]]: """Yield ``(path, parsed_yaml_document)`` for every document in every scenario file. Tolerates multi-document YAML, top-level lists, and ``scenarios:`` wrapper mappings.""" directory = scenarios_dir(root) if not directory.is_dir(): return for path in sorted(directory.rglob("*.yaml")): text = path.read_text(encoding="utf-8") for document in yaml.safe_load_all(text): if document is None: continue yield path, document def iter_scenarios(root: Path) -> Iterator[tuple[Path, dict]]: """Yield ``(path, scenario_dict)`` for every individual scenario across all corpus files.""" for path, document in iter_scenario_documents(root): if isinstance(document, list): for entry in document: if isinstance(entry, dict): yield path, entry elif isinstance(document, dict): inner = document.get("scenarios") if isinstance(inner, list): for entry in inner: if isinstance(entry, dict): yield path, entry else: yield path, document # --------------------------------------------------------------------------- # Check 1: blocked_by references resolve against constitution/parameters.yaml # --------------------------------------------------------------------------- def load_parameter_paths(root: Path) -> set[str]: path = parameters_file(root) data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} paths = flatten_keys(data) # If the document uses a single top-level wrapper section (e.g. # ``parameters:`` or ``kernel:``), accept references written without it. if isinstance(data, dict) and len(data) == 1: (wrapper,) = data.keys() inner = data[wrapper] if isinstance(inner, dict): paths |= flatten_keys(inner) return paths def check_blocked_by_keys(root: Path) -> list[Finding]: check = "blocked_by->parameters" findings: list[Finding] = [] try: parameter_paths = load_parameter_paths(root) except (OSError, yaml.YAMLError) as exc: return [Finding(check, SEVERITY_ERROR, f"cannot load parameters.yaml: {exc}")] total_refs = 0 for path, scenario in iter_scenarios(root): refs: set[str] = set() _collect_string_values(scenario, BLOCKED_BY_KEYS, refs) total_refs += len(refs) for ref in sorted(refs): if ref in parameter_paths: continue # Tolerate an explicit top-level prefix the parameters file may # or may not carry, in either direction. stripped = ref.split(".", 1)[1] if "." in ref else None if stripped and stripped in parameter_paths: continue scenario_id = scenario.get("id") or scenario.get("name") or "" findings.append( Finding( check, SEVERITY_ERROR, f"{path.name}::{scenario_id} references parameter " f"'{ref}' which does not exist in constitution/parameters.yaml", ) ) findings.append( Finding( check, SEVERITY_INFO, f"resolved {total_refs} blocked_by reference(s) against " f"{len(parameter_paths)} parameter path(s)", ) ) return findings # --------------------------------------------------------------------------- # Check 2: scenario action verbs resolve against engine handlers # --------------------------------------------------------------------------- _HANDLER_PREFIXES = ("action_", "handle_", "apply_") def _harvest_registries(obj: Any, dict_verbs: set[str], method_verbs: set[str]) -> None: for name in dir(obj): if name.startswith("__"): continue try: attr = getattr(obj, name) except Exception: # pragma: no cover - defensive against properties continue if ( isinstance(attr, dict) and attr and all(isinstance(k, str) for k in attr) and all(callable(v) for v in attr.values()) ): dict_verbs.update(attr.keys()) elif callable(attr): for prefix in _HANDLER_PREFIXES: if name.startswith(prefix) and len(name) > len(prefix): method_verbs.add(name[len(prefix):]) def discover_engine_actions() -> tuple[set[str], bool]: """Return ``(verbs, authoritative)``. *authoritative* is True only when at least one dict-style str->callable registry was discovered at module or class scope. Convention-derived verbs (``action_*`` / ``handle_*`` / ``apply_*`` methods) are included either way, but on their own are not treated as a complete inventory. """ dict_verbs: set[str] = set() method_verbs: set[str] = set() try: engine = importlib.import_module("fabletest.engine") except Exception: return set(), False _harvest_registries(engine, dict_verbs, method_verbs) for name in dir(engine): attr = getattr(engine, name, None) if isinstance(attr, type): _harvest_registries(attr, dict_verbs, method_verbs) return dict_verbs | method_verbs, bool(dict_verbs) def check_action_verbs(root: Path) -> list[Finding]: check = "actions->engine" findings: list[Finding] = [] known_verbs, authoritative = discover_engine_actions() used: dict[str, list[str]] = {} for path, scenario in iter_scenarios(root): verbs: set[str] = set() _collect_string_values(scenario, ACTION_KEYS, verbs) scenario_id = scenario.get("id") or scenario.get("name") or "" for verb in verbs: used.setdefault(verb, []).append(f"{path.name}::{scenario_id}") if not known_verbs: findings.append( Finding( check, SEVERITY_WARNING, "could not discover any action registry in fabletest.engine; " "verb validity is enforced only by runtime corpus execution " "(tests/test_corpus.py)", ) ) return findings severity = SEVERITY_ERROR if authoritative else SEVERITY_WARNING for verb in sorted(used): if verb not in known_verbs: where = ", ".join(sorted(used[verb])[:5]) suffix = "" if len(used[verb]) <= 5 else f" (+{len(used[verb]) - 5} more)" qualifier = ( "" if authoritative else " (non-authoritative discovery; the runtime " "corpus run is the deciding gate)" ) findings.append( Finding( check, severity, f"verb '{verb}' used by {where}{suffix} has no registered " f"engine handler{qualifier}", ) ) findings.append( Finding( check, SEVERITY_INFO, f"checked {len(used)} distinct verb(s) against {len(known_verbs)} " f"discovered handler(s); discovery " f"{'authoritative' if authoritative else 'convention-based'}", ) ) return findings # --------------------------------------------------------------------------- # Check 3: scenario taxonomy references resolve against fabletest.taxonomy # --------------------------------------------------------------------------- def discover_taxonomy_categories() -> set[str]: categories: set[str] = set() try: taxonomy = importlib.import_module("fabletest.taxonomy") except Exception: return categories def harvest_strings(obj: Any) -> None: for name in dir(obj): if name.startswith("__"): continue try: attr = getattr(obj, name) except Exception: # pragma: no cover continue if isinstance(attr, str): categories.add(attr) elif isinstance(attr, (frozenset, set, list, tuple)): categories.update(x for x in attr if isinstance(x, str)) elif isinstance(attr, dict): categories.update(k for k in attr if isinstance(k, str)) harvest_strings(taxonomy) for name in dir(taxonomy): attr = getattr(taxonomy, name, None) if isinstance(attr, type): if issubclass(attr, enum.Enum): for member in attr: if isinstance(member.value, str): categories.add(member.value) categories.add(member.name) categories.add(member.name.lower()) categories.add(member.name.lower().replace("_", "-")) else: harvest_strings(attr) return categories def check_taxonomy_references(root: Path) -> list[Finding]: check = "category->taxonomy" findings: list[Finding] = [] known = discover_taxonomy_categories() if not known: return [ Finding( check, SEVERITY_WARNING, "could not discover category identifiers in fabletest.taxonomy; " "skipping (corpus loader validates categories at runtime)", ) ] checked = 0 for path, scenario in iter_scenarios(root): refs: set[str] = set() _collect_string_values(scenario, TAXONOMY_KEYS, refs) scenario_id = scenario.get("id") or scenario.get("name") or "" for ref in sorted(refs): checked += 1 normalized = {ref, ref.lower(), ref.lower().replace("_", "-"), ref.lower().replace("-", "_")} if normalized & known: continue findings.append( Finding( check, SEVERITY_ERROR, f"{path.name}::{scenario_id} declares attack class '{ref}' " f"which fabletest.taxonomy does not define", ) ) findings.append( Finding(check, SEVERITY_INFO, f"checked {checked} taxonomy reference(s)") ) return findings # --------------------------------------------------------------------------- # Check 4: imports vs. declared manifest dependencies # --------------------------------------------------------------------------- #: Import-name -> PyPI distribution name, for packages whose names differ. IMPORT_TO_DIST = { "yaml": "pyyaml", } #: Roots that are part of this repository, not external dependencies. LOCAL_ROOTS = frozenset({"fabletest", "tests", "conftest", "scenarios", "constitution"}) def _canonicalize(name: str) -> str: return re.sub(r"[-_.]+", "-", name).lower() def load_declared_distributions(root: Path) -> Optional[set[str]]: """Parse pyproject.toml and return canonicalized declared distribution names, or None if no TOML parser is available (Python < 3.11 without tomli).""" try: import tomllib # Python >= 3.11 except ImportError: try: import tomli as tomllib # type: ignore[no-redef] except ImportError: return None data = tomllib.loads((root / "pyproject.toml").read_text(encoding="utf-8")) specs: list[str] = [] project = data.get("project", {}) specs.extend(project.get("dependencies", []) or []) for group in (project.get("optional-dependencies", {}) or {}).values(): specs.extend(group) for group in (data.get("dependency-groups", {}) or {}).values(): specs.extend(item for item in group if isinstance(item, str)) declared: set[str] = set() for spec in specs: match = re.match(r"^\s*([A-Za-z0-9][A-Za-z0-9._-]*)", spec) if match: declared.add(_canonicalize(match.group(1))) return declared def iter_python_files(root: Path) -> Iterator[Path]: for sub in ("fabletest", "tests"): directory = root / sub if directory.is_dir(): yield from sorted(directory.rglob("*.py")) def collect_import_roots(path: Path) -> set[str]: tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) roots: set[str] = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: roots.add(alias.name.split(".")[0]) elif isinstance(node, ast.ImportFrom): if node.level == 0 and node.module: roots.add(node.module.split(".")[0]) return roots def check_imports_against_manifest(root: Path) -> list[Finding]: check = "imports->manifest" findings: list[Finding] = [] stdlib = getattr(sys, "stdlib_module_names", None) if not stdlib: return [ Finding( check, SEVERITY_WARNING, "sys.stdlib_module_names unavailable on this interpreter; " "skipping manifest check (requires Python >= 3.10)", ) ] declared = load_declared_distributions(root) if declared is None: return [ Finding( check, SEVERITY_WARNING, "no TOML parser available (need Python >= 3.11 or 'tomli'); " "skipping manifest check", ) ] usage: dict[str, set[str]] = {} for path in iter_python_files(root): try: roots = collect_import_roots(path) except SyntaxError as exc: findings.append( Finding(check, SEVERITY_ERROR, f"{path}: failed to parse: {exc}") ) continue for module_root in roots: usage.setdefault(module_root, set()).add( str(path.relative_to(root)) ) third_party = 0 for module_root in sorted(usage): if module_root in LOCAL_ROOTS or module_root in stdlib: continue third_party += 1 dist = _canonicalize(IMPORT_TO_DIST.get(module_root, module_root)) if dist not in declared: files = ", ".join(sorted(usage[module_root])[:5]) findings.append( Finding( check, SEVERITY_ERROR, f"import '{module_root}' (distribution '{dist}') used in " f"{files} is not declared in pyproject.toml", ) ) findings.append( Finding( check, SEVERITY_INFO, f"audited {len(usage)} import root(s) across " f"{sum(1 for _ in iter_python_files(root))} file(s); " f"{third_party} third-party, {len(declared)} declared distribution(s)", ) ) return findings # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- ALL_CHECKS = ( check_blocked_by_keys, check_action_verbs, check_taxonomy_references, check_imports_against_manifest, ) def run_audit(root: Optional[Path] = None) -> AuditReport: resolved = find_repo_root(root) report = AuditReport() for check_fn in ALL_CHECKS: try: report.findings.extend(check_fn(resolved)) except Exception as exc: # a crashed check must fail loudly, not pass report.findings.append( Finding( check_fn.__name__, SEVERITY_ERROR, f"check crashed: {type(exc).__name__}: {exc}", ) ) return report def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser( prog="python -m fabletest.audit", description="Cross-artifact consistency audit for the constitutional " "test suite (scenarios vs. parameters vs. engine vs. manifest).", ) parser.add_argument( "--root", type=Path, default=None, help="repository root (default: auto-discovered from this file)", ) parser.add_argument( "--strict", action="store_true", help="treat warnings as failures", ) args = parser.parse_args(argv) report = run_audit(args.root) print(format_report(report)) if args.strict: return 0 if report.ok_strict() else 1 return 0 if report.ok else 1 if __name__ == "__main__": raise SystemExit(main())