"""The replay harness. For each dossier, the harness: 1. Validates the dossier against the schema (structure, references, verdict coverage). 2. Runs every encoded move through the kernel v0.1 rule engine (``kernel.evaluate_move``) and compares the engine's verdict against the dossier author's expected verdict. Any mismatch is an ERROR: the dossier's counterfactual analysis is inconsistent with the kernel as written, and the dossier must be fixed (or the kernel's gap acknowledged) before the event counts. This is how the benchmark stays honest — counterfactual scores are structured expert judgments, but the *legal analysis* underneath them is machine-checked. 3. Checks the declared resolution path against the kernel's procedure table and computes counterfactual latency mechanically from procedure clocks. 4. Assembles the side-by-side scorecard inputs (incumbent vs. kernel). """ from __future__ import annotations from dataclasses import dataclass, field from pathlib import Path import yaml from pydantic import ValidationError from . import kernel from .rubric import ScoreSet from .schema import Dossier, VerdictKind @dataclass class Issue: severity: str # "error" | "warning" | "info" message: str @dataclass class MoveEvaluation: decision_point: str move: str actor: str description: str taken_historically: bool incumbent_ruling: str verdict: kernel.Verdict expected_kind: VerdictKind expected_articles: list[str] @property def kind_match(self) -> bool: return self.verdict.kind is self.expected_kind @property def articles_consistent(self) -> bool: return set(self.expected_articles).issubset(set(self.verdict.articles)) @dataclass class EventResult: dossier: Dossier evaluations: list[MoveEvaluation] = field(default_factory=list) issues: list[Issue] = field(default_factory=list) kernel_latency_days: int = 0 @property def ok(self) -> bool: return not any(i.severity == "error" for i in self.issues) @property def incumbent_scores(self) -> ScoreSet: m = self.dossier.incumbent_outcome.metrics return ScoreSet( worst_off=m.worst_off.score, commons_integrity=m.commons_integrity.score, trust_preservation=m.trust_preservation.score, latency_days=m.latency_days, ) @property def kernel_scores(self) -> ScoreSet: m = self.dossier.counterfactual.metrics return ScoreSet( worst_off=m.worst_off.score, commons_integrity=m.commons_integrity.score, trust_preservation=m.trust_preservation.score, latency_days=self.kernel_latency_days, ) class DossierLoadError(Exception): def __init__(self, path: Path, detail: str): self.path = path self.detail = detail super().__init__(f"{path}: {detail}") def load_dossier(path: Path) -> Dossier: try: raw = yaml.safe_load(path.read_text(encoding="utf-8")) except yaml.YAMLError as exc: raise DossierLoadError(path, f"YAML parse failure: {exc}") from exc if not isinstance(raw, dict): raise DossierLoadError(path, "dossier file must contain a YAML mapping") try: return Dossier.model_validate(raw) except ValidationError as exc: raise DossierLoadError(path, f"schema validation failed:\n{exc}") from exc def load_dossiers(directory: Path) -> list[Dossier]: paths = sorted( p for p in directory.glob("*.yaml") if not p.name.startswith("_") ) return [load_dossier(p) for p in paths] def run_dossier(d: Dossier) -> EventResult: result = EventResult(dossier=d) expected = { (ev.decision_point, ev.move): ev for ev in d.counterfactual.expected_verdicts } triggered_procedures: set[str] = set() for dp in d.decision_points: for m in dp.moves: ev = expected[(dp.id, m.id)] # coverage guaranteed by schema validator verdict = kernel.evaluate_move(m.attributes) evaluation = MoveEvaluation( decision_point=dp.id, move=m.id, actor=m.actor, description=m.description, taken_historically=m.taken_historically, incumbent_ruling=m.incumbent_ruling.value, verdict=verdict, expected_kind=ev.verdict, expected_articles=list(ev.articles), ) result.evaluations.append(evaluation) if not evaluation.kind_match: result.issues.append( Issue( "error", f"{d.id}/{dp.id}/{m.id}: dossier expects " f"'{ev.verdict.value}' but kernel engine returns " f"'{verdict.kind.value}' ({'; '.join(verdict.reasons)})", ) ) elif not evaluation.articles_consistent: result.issues.append( Issue( "warning", f"{d.id}/{dp.id}/{m.id}: expected articles " f"{evaluation.expected_articles} are not a subset of " f"engine citations {verdict.articles}", ) ) triggered_procedures.update(verdict.procedures) # Resolution path validity and mechanical latency try: result.kernel_latency_days = kernel.path_latency_days( d.counterfactual.resolution_path ) except KeyError as exc: result.issues.append( Issue("error", f"{d.id}: resolution path references {exc}") ) result.kernel_latency_days = 0 path_set = set(d.counterfactual.resolution_path) off_path = triggered_procedures - path_set if off_path: result.issues.append( Issue( "info", f"{d.id}: kernel procedures triggered but not on the declared " f"critical path (treated as concurrent): " f"{', '.join(sorted(off_path))}", ) ) unused = path_set - triggered_procedures if unused: result.issues.append( Issue( "warning", f"{d.id}: resolution path declares procedures never triggered " f"by any move verdict: {', '.join(sorted(unused))}", ) ) return result def run_all(directory: Path) -> list[EventResult]: return [run_dossier(d) for d in load_dossiers(directory)]