"""Aggregate side-by-side scorecard generation across the full benchmark. This module turns thirty per-event scorecards into the headline artifact of the Incumbent Benchmark: a single side-by-side table comparing how kernel v0.1 and the incumbent constitutions scored across every historical stress event, broken down by rubric dimension and by event category. It supports two modes of operation: 1. **Results mode** (preferred for reproducibility): consume per-event scorecard JSON files previously produced by the CLI / scorecard module (``fable-bench run --json out/.json``-style outputs) and aggregate them. This decouples aggregation from simulation, so a partial benchmark run can still be aggregated and the aggregate is a pure function of artifacts already on disk. 2. **Pipeline mode**: run every dossier in a directory through the replay harness and scorecard builder in-process, then aggregate the results. Aggregation rules (documented in ``docs/SCORECARD.md``): * Scores are normalised to a 0-100 scale for display. Inputs on a 0-1 scale are detected (all values <= 1.5) and rescaled. * The composite score is the rubric-weighted sum of the four dimensions, with the worst-off-participant dimension weighted heaviest. Weights are imported from :mod:`incumbent_benchmark.rubric` when available so the aggregate can never drift from the per-event rubric; a documented default is used as fallback. * A per-event "win" requires the composite delta to exceed a tie epsilon (default 1.0 point) so that noise-level differences are reported as ties rather than victories. * The aggregate always reports the **floor**: the single worst worst-off score either constitution produced across all events. Per the project's first scoring rule, a constitution that averages well but lets one population crater is not a good constitution; the floor keeps that failure mode visible at the top of the report. Invoke as a module:: python -m incumbent_benchmark.aggregate --results-dir out/ \ -o out/AGGREGATE.md --json out/aggregate.json or:: python -m incumbent_benchmark.aggregate --dossier-dir dossiers/ \ -o out/AGGREGATE.md --json out/aggregate.json """ from __future__ import annotations import argparse import json import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, Mapping, MutableMapping, Sequence __all__ = [ "DIMENSIONS", "DEFAULT_WEIGHTS", "TIE_EPSILON", "EventScores", "AggregateReport", "extract_scores", "normalise_event", "composite", "event_from_mapping", "load_event_result", "collect_results", "run_pipeline", "aggregate", "render_markdown", "render_json", "main", ] # --------------------------------------------------------------------------- # Rubric dimensions and weights # --------------------------------------------------------------------------- #: Canonical dimension keys, in rubric priority order. DIMENSIONS: tuple[str, ...] = ( "worst_off", "commons_integrity", "trust_preservation", "latency", ) #: Default rubric weights (mirrors docs/RUBRIC.md). The worst-off #: participant dimension dominates by design. DEFAULT_WEIGHTS: dict[str, float] = { "worst_off": 0.40, "commons_integrity": 0.25, "trust_preservation": 0.20, "latency": 0.15, } # Prefer the weights declared in the rubric module so the aggregate can # never silently diverge from per-event scoring. Fall back to the # documented defaults if the rubric module exposes them under a different # name (the fallback is identical by construction; this is belt-and-braces # for modules built in separate passes). try: # pragma: no cover - exercised implicitly on import from .rubric import DIMENSION_WEIGHTS as _RUBRIC_WEIGHTS # type: ignore[attr-defined] WEIGHTS: dict[str, float] = {str(k): float(v) for k, v in dict(_RUBRIC_WEIGHTS).items()} except (ImportError, AttributeError): # pragma: no cover WEIGHTS = dict(DEFAULT_WEIGHTS) #: Composite deltas with absolute value at or below this many points (on the #: 0-100 scale) are reported as ties. TIE_EPSILON: float = 1.0 # Tolerated key spellings for each dimension, in priority order. _DIMENSION_ALIASES: dict[str, tuple[str, ...]] = { "worst_off": ( "worst_off", "worst-off", "worst_off_participant", "worst_off_outcome", "empathy", ), "commons_integrity": ( "commons_integrity", "commons-integrity", "commons", ), "trust_preservation": ( "trust_preservation", "trust-preservation", "trust", ), "latency": ( "latency", "latency_to_resolution", "resolution_latency", ), } # Tolerated key spellings for each side of the comparison. _SIDE_ALIASES: dict[str, tuple[str, ...]] = { "incumbent": ("incumbent", "baseline", "historical"), "kernel": ("kernel", "fable", "kernel_v0_1", "counterfactual"), } # Containers that may wrap a dimension-score mapping. _SCORE_CONTAINERS: tuple[str, ...] = ("scores", "dimensions", "dimension_scores") # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class EventScores: """Normalised scores for one event, both sides, 0-100 scale.""" event_id: str title: str category: str country: str year: str incumbent: dict[str, float] kernel: dict[str, float] def delta(self, dimension: str) -> float: return self.kernel[dimension] - self.incumbent[dimension] def composite_delta(self, weights: Mapping[str, float]) -> float: return composite(self.kernel, weights) - composite(self.incumbent, weights) @dataclass class AggregateReport: """The aggregated benchmark result.""" n_events: int weights: dict[str, float] tie_epsilon: float dimension_stats: dict[str, dict[str, float]] composite_stats: dict[str, float] category_stats: dict[str, dict[str, float]] floor: dict[str, dict[str, Any]] events: list[EventScores] = field(default_factory=list) # --------------------------------------------------------------------------- # Extraction and normalisation # --------------------------------------------------------------------------- def _find_alias(mapping: Mapping[str, Any], aliases: Sequence[str]) -> Any: for key in aliases: if key in mapping: return mapping[key] return None def _coerce_score(value: Any) -> float | None: """Accept a bare number or a ``{"score": x, ...}`` mapping.""" if isinstance(value, bool): # bool is an int subclass; reject explicitly return None if isinstance(value, (int, float)): return float(value) if isinstance(value, Mapping): inner = _find_alias(value, ("score", "value", "points")) if isinstance(inner, (int, float)) and not isinstance(inner, bool): return float(inner) return None def extract_scores(side: Mapping[str, Any]) -> dict[str, float]: """Extract the four canonical dimension scores from one side's mapping. Tolerates dimension aliases and a single wrapping container key (``scores`` / ``dimensions``). Raises :class:`ValueError` naming any dimensions that could not be found, so a contract mismatch fails loudly rather than producing a silently wrong aggregate. """ candidates: list[Mapping[str, Any]] = [side] for container in _SCORE_CONTAINERS: inner = side.get(container) if isinstance(inner, Mapping): candidates.append(inner) scores: dict[str, float] = {} for dim in DIMENSIONS: for candidate in candidates: raw = _find_alias(candidate, _DIMENSION_ALIASES[dim]) value = _coerce_score(raw) if value is not None: scores[dim] = value break missing = [d for d in DIMENSIONS if d not in scores] if missing: raise ValueError( "scorecard side is missing dimension scores for: " + ", ".join(missing) + f" (keys present: {sorted(side.keys())})" ) return scores def normalise_event( incumbent: Mapping[str, float], kernel: Mapping[str, float] ) -> tuple[dict[str, float], dict[str, float]]: """Normalise both sides of one event to a 0-100 scale. Scale detection is performed jointly across both sides so that an event scored on 0-1 is rescaled consistently. Values are clamped to [0, 100]. """ values = list(incumbent.values()) + list(kernel.values()) scale = 100.0 if values and max(values) <= 1.5 else 1.0 def _norm(side: Mapping[str, float]) -> dict[str, float]: return {k: max(0.0, min(100.0, v * scale)) for k, v in side.items()} return _norm(incumbent), _norm(kernel) def composite(scores: Mapping[str, float], weights: Mapping[str, float] | None = None) -> float: """Rubric-weighted composite on the 0-100 scale.""" w = dict(weights) if weights is not None else WEIGHTS total_weight = sum(w.get(d, 0.0) for d in DIMENSIONS) if total_weight <= 0: raise ValueError("rubric weights sum to zero; cannot compute composite") return sum(scores[d] * w.get(d, 0.0) for d in DIMENSIONS) / total_weight # --------------------------------------------------------------------------- # Loading per-event results # --------------------------------------------------------------------------- def event_from_mapping(data: Mapping[str, Any], fallback_id: str = "unknown") -> EventScores: """Build an :class:`EventScores` from a per-event scorecard mapping.""" meta_block = data.get("event") if isinstance(data.get("event"), Mapping) else None if meta_block is None and isinstance(data.get("meta"), Mapping): meta_block = data["meta"] meta: Mapping[str, Any] = meta_block if meta_block is not None else data event_id = str( _find_alias(meta, ("id", "event_id", "slug")) or fallback_id ) title = str(_find_alias(meta, ("title", "name")) or event_id) category = str(_find_alias(meta, ("category", "event_category", "type")) or "uncategorized") country = str(_find_alias(meta, ("country", "jurisdiction")) or "") year = str(_find_alias(meta, ("year", "date", "period")) or "") sides: dict[str, dict[str, float]] = {} score_block = None for container in ("scores", "scorecard", "results", "comparison"): inner = data.get(container) if isinstance(inner, Mapping): score_block = inner break for canonical, aliases in _SIDE_ALIASES.items(): side_mapping: Mapping[str, Any] | None = None for key in aliases: if isinstance(data.get(key), Mapping): side_mapping = data[key] break if side_mapping is None and score_block is not None: for key in aliases: if isinstance(score_block.get(key), Mapping): side_mapping = score_block[key] break if side_mapping is None: raise ValueError( f"scorecard for {event_id!r} has no {canonical!r} side " f"(looked for keys {aliases} at top level and inside " f"{sorted(score_block.keys()) if score_block else 'no score container'})" ) sides[canonical] = extract_scores(side_mapping) incumbent, kernel = normalise_event(sides["incumbent"], sides["kernel"]) return EventScores( event_id=event_id, title=title, category=category, country=country, year=year, incumbent=incumbent, kernel=kernel, ) def load_event_result(path: Path) -> EventScores: """Load one per-event scorecard JSON file.""" data = json.loads(path.read_text(encoding="utf-8")) if not isinstance(data, Mapping): raise ValueError(f"{path}: top-level JSON value must be an object") return event_from_mapping(data, fallback_id=path.stem) def collect_results(results_dir: Path) -> list[EventScores]: """Load all per-event scorecard JSON files in a directory.""" paths = sorted(p for p in results_dir.glob("*.json") if p.name != "aggregate.json") if not paths: raise FileNotFoundError(f"no per-event scorecard JSON files found in {results_dir}") events = [load_event_result(p) for p in paths] _check_duplicates(events) return events # --------------------------------------------------------------------------- # Pipeline mode # --------------------------------------------------------------------------- def _resolve(module: Any, names: Sequence[str]) -> Any: for name in names: fn = getattr(module, name, None) if callable(fn): return fn raise RuntimeError( f"module {module.__name__!r} exposes none of the expected callables " f"{tuple(names)}; aggregate pipeline mode requires one of them. " "Run per-event scorecards via the CLI and use --results-dir instead." ) def run_pipeline(dossier_dir: Path) -> list[EventScores]: """Replay and score every dossier in ``dossier_dir`` in-process.""" from . import harness as _harness # imported lazily; results mode needs neither from . import scorecard as _scorecard run_fn = _resolve(_harness, ("run_dossier", "run_event", "replay", "run")) build_fn = _resolve( _scorecard, ("build_scorecard", "scorecard_for", "build", "to_mapping", "to_dict") ) paths = sorted(p for p in dossier_dir.glob("*.yaml") if not p.name.upper().startswith("INDEX")) if not paths: raise FileNotFoundError(f"no dossier YAML files found in {dossier_dir}") events: list[EventScores] = [] for path in paths: result = run_fn(path) card = build_fn(result) if isinstance(card, str): card = json.loads(card) if not isinstance(card, Mapping): raise RuntimeError( f"scorecard builder returned {type(card).__name__} for {path.name}; " "expected a mapping or JSON string" ) events.append(event_from_mapping(card, fallback_id=path.stem)) _check_duplicates(events) return events def _check_duplicates(events: Iterable[EventScores]) -> None: seen: set[str] = set() for ev in events: if ev.event_id in seen: raise ValueError(f"duplicate event id in results: {ev.event_id!r}") seen.add(ev.event_id) # --------------------------------------------------------------------------- # Aggregation # --------------------------------------------------------------------------- def _mean(values: Sequence[float]) -> float: return sum(values) / len(values) if values else 0.0 def aggregate( events: Sequence[EventScores], weights: Mapping[str, float] | None = None, tie_epsilon: float = TIE_EPSILON, ) -> AggregateReport: """Aggregate per-event scores into the benchmark-wide report.""" if not events: raise ValueError("cannot aggregate zero events") w = dict(weights) if weights is not None else dict(WEIGHTS) dimension_stats: dict[str, dict[str, float]] = {} for dim in DIMENSIONS: k_scores = [e.kernel[dim] for e in events] i_scores = [e.incumbent[dim] for e in events] deltas = [k - i for k, i in zip(k_scores, i_scores)] dimension_stats[dim] = { "kernel_mean": _mean(k_scores), "incumbent_mean": _mean(i_scores), "delta_mean": _mean(deltas), "kernel_min": min(k_scores), "incumbent_min": min(i_scores), "kernel_wins": float(sum(1 for d in deltas if d > tie_epsilon)), "incumbent_wins": float(sum(1 for d in deltas if d < -tie_epsilon)), "ties": float(sum(1 for d in deltas if abs(d) <= tie_epsilon)), } k_comp = [composite(e.kernel, w) for e in events] i_comp = [composite(e.incumbent, w) for e in events] comp_deltas = [k - i for k, i in zip(k_comp, i_comp)] composite_stats = { "kernel_mean": _mean(k_comp), "incumbent_mean": _mean(i_comp), "delta_mean": _mean(comp_deltas), "kernel_wins": float(sum(1 for d in comp_deltas if d > tie_epsilon)), "incumbent_wins": float(sum(1 for d in comp_deltas if d < -tie_epsilon)), "ties": float(sum(1 for d in comp_deltas if abs(d) <= tie_epsilon)), } category_stats: dict[str, dict[str, float]] = {} for ev in events: bucket = category_stats.setdefault( ev.category, {"n": 0.0, "kernel_sum": 0.0, "incumbent_sum": 0.0}, ) bucket["n"] += 1 bucket["kernel_sum"] += composite(ev.kernel, w) bucket["incumbent_sum"] += composite(ev.incumbent, w) for cat, bucket in category_stats.items(): n = bucket.pop("n") bucket["n"] = n bucket["kernel_mean"] = bucket.pop("kernel_sum") / n bucket["incumbent_mean"] = bucket.pop("incumbent_sum") / n bucket["delta_mean"] = bucket["kernel_mean"] - bucket["incumbent_mean"] kernel_floor_event = min(events, key=lambda e: e.kernel["worst_off"]) incumbent_floor_event = min(events, key=lambda e: e.incumbent["worst_off"]) floor = { "kernel": { "score": kernel_floor_event.kernel["worst_off"], "event_id": kernel_floor_event.event_id, "title": kernel_floor_event.title, }, "incumbent": { "score": incumbent_floor_event.incumbent["worst_off"], "event_id": incumbent_floor_event.event_id, "title": incumbent_floor_event.title, }, } return AggregateReport( n_events=len(events), weights=w, tie_epsilon=tie_epsilon, dimension_stats=dimension_stats, composite_stats=composite_stats, category_stats=category_stats, floor=floor, events=list(events), ) # --------------------------------------------------------------------------- # Rendering # --------------------------------------------------------------------------- _DIMENSION_LABELS = { "worst_off": "Worst-off participant", "commons_integrity": "Commons integrity", "trust_preservation": "Trust preservation", "latency": "Latency to resolution", } def _fmt(x: float) -> str: return f"{x:.1f}" def _fmt_signed(x: float) -> str: return f"{x:+.1f}" def render_markdown(report: AggregateReport) -> str: """Render the aggregate report as a Markdown document.""" w = report.weights lines: list[str] = [] add = lines.append add("# Incumbent Benchmark — Aggregate Scorecard") add("") add( f"Kernel v0.1 vs. incumbent constitutions across **{report.n_events} historical " "stress events**. All scores 0–100, higher is better. The composite is the " "rubric-weighted sum (" + ", ".join( f"{_DIMENSION_LABELS[d]} {w.get(d, 0.0):.0%}" for d in DIMENSIONS ) + f"). Composite deltas within ±{report.tie_epsilon:.1f} are ties." ) add("") cs = report.composite_stats add("## Headline") add("") add("| | Kernel v0.1 | Incumbent | Δ (kernel − incumbent) |") add("|---|---:|---:|---:|") add( f"| **Composite (mean)** | {_fmt(cs['kernel_mean'])} | " f"{_fmt(cs['incumbent_mean'])} | {_fmt_signed(cs['delta_mean'])} |" ) add( f"| **Events won / tied / lost** | {cs['kernel_wins']:.0f} | " f"{cs['incumbent_wins']:.0f} | {cs['ties']:.0f} ties |" ) add("") kf, inf = report.floor["kernel"], report.floor["incumbent"] add( "**The floor (first scoring rule):** the single worst worst-off-participant " f"score under the kernel is **{_fmt(float(kf['score']))}** " f"({kf['title']}); under the incumbents it is " f"**{_fmt(float(inf['score']))}** ({inf['title']}). " "A constitution is judged by its floor before its average." ) add("") add("## By rubric dimension") add("") add("| Dimension | Weight | Kernel mean | Incumbent mean | Δ mean | Kernel min | Incumbent min | K wins | I wins | Ties |") add("|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|") for dim in DIMENSIONS: st = report.dimension_stats[dim] add( f"| {_DIMENSION_LABELS[dim]} | {w.get(dim, 0.0):.0%} | " f"{_fmt(st['kernel_mean'])} | {_fmt(st['incumbent_mean'])} | " f"{_fmt_signed(st['delta_mean'])} | {_fmt(st['kernel_min'])} | " f"{_fmt(st['incumbent_min'])} | {st['kernel_wins']:.0f} | " f"{st['incumbent_wins']:.0f} | {st['ties']:.0f} |" ) add("") add("## By event category") add("") add("| Category | n | Kernel composite | Incumbent composite | Δ |") add("|---|---:|---:|---:|---:|") for cat in sorted(report.category_stats): st = report.category_stats[cat] add( f"| {cat} | {st['n']:.0f} | {_fmt(st['kernel_mean'])} | " f"{_fmt(st['incumbent_mean'])} | {_fmt_signed(st['delta_mean'])} |" ) add("") add("## Per-event results") add("") add("(Sorted by composite delta, kernel-favourable first.)") add("") add("| Event | Category | Year | Kernel | Incumbent | Δ composite | Δ worst-off |") add("|---|---|---|---:|---:|---:|---:|") for ev in sorted(report.events, key=lambda e: e.composite_delta(w), reverse=True): add( f"| {ev.title} | {ev.category} | {ev.year} | " f"{_fmt(composite(ev.kernel, w))} | {_fmt(composite(ev.incumbent, w))} | " f"{_fmt_signed(ev.composite_delta(w))} | " f"{_fmt_signed(ev.delta('worst_off'))} |" ) add("") add("---") add("") add( "*Interpretation caveat: these are text-only simulations of historical " "counterfactuals, scored by a deterministic rubric against structured " "dossiers. They measure what each constitutional text permits, forbids, " "and incentivises under the dossier's recorded pressures — not what any " "real population would actually have done. See `docs/METHODOLOGY.md` " "for the full validity discussion before quoting any number here.*" ) add("") return "\n".join(lines) def render_json(report: AggregateReport) -> str: """Render the aggregate report as machine-readable JSON.""" payload: dict[str, Any] = { "benchmark": "incumbent-benchmark", "n_events": report.n_events, "weights": report.weights, "tie_epsilon": report.tie_epsilon, "composite": report.composite_stats, "dimensions": report.dimension_stats, "categories": report.category_stats, "floor": report.floor, "events": [ { "id": ev.event_id, "title": ev.title, "category": ev.category, "country": ev.country, "year": ev.year, "kernel": ev.kernel, "incumbent": ev.incumbent, "kernel_composite": composite(ev.kernel, report.weights), "incumbent_composite": composite(ev.incumbent, report.weights), "composite_delta": ev.composite_delta(report.weights), } for ev in report.events ], } return json.dumps(payload, indent=2, sort_keys=False) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main(argv: Sequence[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="python -m incumbent_benchmark.aggregate", description="Aggregate the Incumbent Benchmark into a side-by-side scorecard.", ) source = parser.add_mutually_exclusive_group(required=True) source.add_argument( "--results-dir", type=Path, help="Directory of per-event scorecard JSON files to aggregate.", ) source.add_argument( "--dossier-dir", type=Path, help="Directory of dossier YAML files to replay, score, and aggregate in-process.", ) parser.add_argument( "-o", "--output", type=Path, default=None, help="Write the Markdown scorecard here (default: stdout).", ) parser.add_argument( "--json", type=Path, default=None, dest="json_output", help="Also write the machine-readable aggregate JSON here.", ) parser.add_argument( "--tie-epsilon", type=float, default=TIE_EPSILON, help=f"Composite deltas within this are ties (default {TIE_EPSILON}).", ) args = parser.parse_args(argv) try: if args.results_dir is not None: events = collect_results(args.results_dir) else: events = run_pipeline(args.dossier_dir) report = aggregate(events, tie_epsilon=args.tie_epsilon) except (FileNotFoundError, ValueError, RuntimeError) as exc: print(f"error: {exc}", file=sys.stderr) return 2 markdown = render_markdown(report) if args.output is not None: args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(markdown, encoding="utf-8") print(f"wrote {args.output}", file=sys.stderr) else: print(markdown) if args.json_output is not None: args.json_output.parent.mkdir(parents=True, exist_ok=True) args.json_output.write_text(render_json(report), encoding="utf-8") print(f"wrote {args.json_output}", file=sys.stderr) return 0 if __name__ == "__main__": # pragma: no cover raise SystemExit(main())