"""Kernel loading, parameter access, and amendment classification. The kernel is data: a YAML document of articles (human-readable law) and ``params`` (the machine-readable knobs the legality engine enforces). One legality engine runs *any* kernel version; behavioural differences between v0.1 and v0.2 come entirely from params. This is what lets the exploit-to-test pipeline replay a single scripted attack against both versions and assert opposite outcomes. """ from __future__ import annotations import copy from dataclasses import dataclass from pathlib import Path from typing import Any, Mapping import yaml __all__ = ["Article", "Kernel", "load_kernel", "locate_kernel", "KernelError"] class KernelError(RuntimeError): """Raised when a kernel document is missing or malformed.""" @dataclass(frozen=True) class Article: id: str title: str text: str @dataclass(frozen=True) class Kernel: """An immutable snapshot of the constitution. Amendments do not mutate a kernel; they produce a new one via :meth:`with_changes`. The environment holds the *current* kernel and swaps it when an amendment executes — so an episode's legality really does shift under the agents' feet, exactly as it would for citizens. """ name: str version: str articles: tuple[Article, ...] params: Mapping[str, Any] invariants: tuple[Mapping[str, Any], ...] source_path: str | None = None # ----------------------------------------------------------- access def get(self, path: str, default: Any = None) -> Any: """Dotted-path access into ``params``. ``kernel.get("thresholds.kernel")`` -> ``0.667``. Returns ``default`` if any segment is missing. """ node: Any = self.params for part in path.split("."): if isinstance(node, Mapping) and part in node: node = node[part] else: return default return node def article(self, article_id: str) -> Article | None: for art in self.articles: if art.id == article_id: return art return None # --------------------------------------------------- classification def classify_change(self, path: str) -> str: """Classify a single parameter path as ``"kernel"`` or ``"ordinary"``. v0.1 semantics: exact-match against ``amendment.kernel_class_params`` only. The list itself is NOT in the list — that hole is EXP-006. v0.2 adds ``amendment.kernel_class_prefixes`` so whole subsystems (including the classifier) are kernel-class. """ exact = self.get("amendment.kernel_class_params") or [] if path in exact: return "kernel" prefixes = self.get("amendment.kernel_class_prefixes") or [] for prefix in prefixes: if path == prefix.rstrip(".") or path.startswith(prefix): return "kernel" return "ordinary" def classify_changes(self, changes: Mapping[str, Any]) -> str: """A change-set is kernel-class if any touched path is kernel-class.""" for path in changes: if self.classify_change(path) == "kernel": return "kernel" return "ordinary" def floor_violation(self, path: str, value: Any) -> str | None: """Return a human-readable violation if ``value`` breaches a constitutional floor for ``path``, else ``None``. v0.1 declares no floors; v0.2 uses them to make thresholds un-lowerable below safe minima even by supermajority. """ floors = self.get("amendment.param_floors") or {} if path in floors and isinstance(value, (int, float)) and not isinstance(value, bool): if value < floors[path]: return ( f"{path}={value!r} is below the constitutional floor " f"{floors[path]!r}" ) return None # --------------------------------------------------------- amending def with_changes(self, changes: Mapping[str, Any]) -> "Kernel": """Return a new kernel with dotted-path param changes applied.""" params = copy.deepcopy(_as_plain_dict(self.params)) for path, value in changes.items(): _set_dotted(params, path, copy.deepcopy(value)) return Kernel( name=self.name, version=self.version, articles=self.articles, params=params, invariants=self.invariants, source_path=self.source_path, ) # ------------------------------------------------------------------ io def load_kernel(path: str | Path) -> Kernel: """Load a kernel YAML document from ``path``.""" p = Path(path) if not p.exists(): raise KernelError(f"kernel file not found: {p}") with p.open("r", encoding="utf-8") as fh: doc = yaml.safe_load(fh) if not isinstance(doc, dict): raise KernelError(f"kernel file is not a mapping: {p}") head = doc.get("kernel") or {} articles = tuple( Article(id=str(a["id"]), title=str(a.get("title", "")), text=str(a.get("text", ""))) for a in (doc.get("articles") or []) ) params = doc.get("params") or {} if not isinstance(params, dict): raise KernelError(f"kernel params must be a mapping: {p}") invariants = tuple(dict(i) for i in (doc.get("invariants") or [])) return Kernel( name=str(head.get("name", "unnamed kernel")), version=str(head.get("version", "0.0.0")), articles=articles, params=params, invariants=invariants, source_path=str(p), ) def locate_kernel(version: str, start: str | Path | None = None) -> Path: """Find ``kernel/kernel-v{version}.yaml`` by walking up from ``start``. Searches (a) ``start`` (default: cwd) and its ancestors, and (b) the package's repository checkout when running from a src layout. """ filename = f"kernel-v{version}.yaml" candidates: list[Path] = [] base = Path(start) if start is not None else Path.cwd() for anchor in [base, *base.resolve().parents]: candidates.append(anchor / "kernel" / filename) here = Path(__file__).resolve() for anchor in here.parents: candidates.append(anchor / "kernel" / filename) for cand in candidates: if cand.exists(): return cand raise KernelError( f"could not locate {filename}; searched upward from {base} and the package checkout" ) # ------------------------------------------------------------- helpers def _as_plain_dict(node: Any) -> Any: if isinstance(node, Mapping): return {k: _as_plain_dict(v) for k, v in node.items()} if isinstance(node, (list, tuple)): return [_as_plain_dict(v) for v in node] return node def _set_dotted(d: dict, path: str, value: Any) -> None: parts = path.split(".") node = d for part in parts[:-1]: nxt = node.get(part) if not isinstance(nxt, dict): nxt = {} node[part] = nxt node = nxt node[parts[-1]] = value