"""The constitution as machine-parseable source. The constitution lives in a source tree:: constitution/ version.yaml # current governance semver invariants.yaml # entrenched invariants kernel/*.yaml # one article per file, layer "kernel" userland/*.yaml # one module per file, layer "userland" This module loads that tree into typed objects, computes normative hashes, applies fork parameter overrides, and diffs two constitutions into a :class:`ChangeSet` that the semver classifier consumes. Normative vs non-normative content (Kernel Article 1, clause K1.2): * Normative: clause identifiers, normalized clause text, parameter types, values, forkability flags and bounds, invariant statements and their bound test identifiers, article/clause membership and layer. * Non-normative: titles, notes, rationale, file layout, YAML formatting, comments. Changing only these is a PATCH-class change. """ from __future__ import annotations import enum from dataclasses import dataclass, field from fractions import Fraction from pathlib import Path from typing import Any, Iterator, Optional import yaml from .canonical import content_hash, normalize_text, parse_ratio from .errors import ParameterError, SourceValidationError KERNEL_LAYER = "kernel" USERLAND_LAYER = "userland" INVARIANTS_LAYER = "invariants" PARAM_TYPES = ("ratio", "integer", "boolean", "string") # --------------------------------------------------------------------------- # Parameters # --------------------------------------------------------------------------- @dataclass(frozen=True) class Parameter: """A named, typed quantity attached to a clause. ``forkable`` parameters may be overridden by forks within ``[min, max]`` (inclusive) without diverging from upstream. Changing a parameter's *value* in the upstream constitution is a normative change; changing its bounds or forkability is also normative (it changes what forks may legally do). """ name: str type: str value: Any forkable: bool = False min: Any = None max: Any = None description: str = "" def __post_init__(self) -> None: if self.type not in PARAM_TYPES: raise ParameterError( f"parameter {self.name!r}: unknown type {self.type!r}", detail={"allowed": list(PARAM_TYPES)}, ) # Validate the declared value (and bounds) parse for the type. self.parsed_value() if self.min is not None: self._parse_typed(self.min, "min") if self.max is not None: self._parse_typed(self.max, "max") if self.forkable and self.type in ("ratio", "integer"): if self.min is None or self.max is None: raise ParameterError( f"forkable parameter {self.name!r} must declare min and max bounds" ) lo, hi = self._parse_typed(self.min, "min"), self._parse_typed(self.max, "max") val = self.parsed_value() if not (lo <= val <= hi): raise ParameterError( f"parameter {self.name!r}: default value {self.value!r} " f"is outside its own bounds [{self.min}, {self.max}]" ) def _parse_typed(self, raw: Any, label: str) -> Any: try: if self.type == "ratio": return parse_ratio(raw) if self.type == "integer": if isinstance(raw, bool) or not isinstance(raw, int): raise ValueError(f"expected integer, got {raw!r}") return raw if self.type == "boolean": if not isinstance(raw, bool): raise ValueError(f"expected boolean, got {raw!r}") return raw if self.type == "string": if not isinstance(raw, str): raise ValueError(f"expected string, got {raw!r}") return raw except ValueError as exc: raise ParameterError( f"parameter {self.name!r}: invalid {label} {raw!r}: {exc}" ) from exc raise ParameterError(f"parameter {self.name!r}: unknown type {self.type!r}") def parsed_value(self) -> Any: return self._parse_typed(self.value, "value") def validate_override(self, raw: Any) -> Any: """Validate a fork override value; return the parsed value.""" if not self.forkable: raise ParameterError( f"parameter {self.name!r} is not forkable; overriding it is a divergence, " "not a parameterization (see Kernel Article 7)" ) parsed = self._parse_typed(raw, "override") if self.type in ("ratio", "integer"): lo = self._parse_typed(self.min, "min") hi = self._parse_typed(self.max, "max") if not (lo <= parsed <= hi): raise ParameterError( f"override for {self.name!r} = {raw!r} is outside the kernel " f"bounds [{self.min}, {self.max}]" ) return parsed def normative(self) -> dict: out: dict[str, Any] = { "type": self.type, "value": self._canon(self.value), "forkable": self.forkable, } if self.min is not None: out["min"] = self._canon(self.min) if self.max is not None: out["max"] = self._canon(self.max) return out def _canon(self, raw: Any) -> Any: if self.type == "ratio": frac = parse_ratio(raw) return f"{frac.numerator}/{frac.denominator}" return raw # --------------------------------------------------------------------------- # Clauses, articles, invariants # --------------------------------------------------------------------------- @dataclass(frozen=True) class Clause: id: str text: str title: str = "" notes: str = "" parameters: dict[str, Parameter] = field(default_factory=dict) def normative(self) -> dict: return { "id": self.id, "text": normalize_text(self.text), "parameters": {name: p.normative() for name, p in sorted(self.parameters.items())}, } def raw(self) -> dict: out = self.normative() out["title"] = self.title out["notes"] = self.notes return out @dataclass(frozen=True) class Article: id: str number: int title: str layer: str clauses: tuple[Clause, ...] module: str = "" # userland module name; empty for kernel notes: str = "" def clause_map(self) -> dict[str, Clause]: return {c.id: c for c in self.clauses} def normative(self) -> dict: return { "id": self.id, "number": self.number, "layer": self.layer, "module": self.module, "clauses": {c.id: c.normative() for c in self.clauses}, } def raw(self) -> dict: out = self.normative() out["title"] = self.title out["notes"] = self.notes out["clauses"] = {c.id: c.raw() for c in self.clauses} return out @dataclass(frozen=True) class Invariant: id: str title: str statement: str tests: tuple[str, ...] notes: str = "" def normative(self) -> dict: return { "id": self.id, "statement": normalize_text(self.statement), "tests": list(self.tests), } def raw(self) -> dict: out = self.normative() out["title"] = self.title out["notes"] = self.notes return out # --------------------------------------------------------------------------- # Loading # --------------------------------------------------------------------------- def _load_yaml(path: Path) -> Any: try: with path.open("r", encoding="utf-8") as handle: return yaml.safe_load(handle) except yaml.YAMLError as exc: raise SourceValidationError(f"{path}: invalid YAML: {exc}") from exc except OSError as exc: raise SourceValidationError(f"{path}: cannot read: {exc}") from exc def _require(mapping: Any, key: str, where: str) -> Any: if not isinstance(mapping, dict) or key not in mapping: raise SourceValidationError(f"{where}: missing required field {key!r}") return mapping[key] def _parse_parameter(name: str, raw: Any, where: str) -> Parameter: if not isinstance(raw, dict): raise SourceValidationError(f"{where}: parameter {name!r} must be a mapping") try: return Parameter( name=name, type=_require(raw, "type", f"{where}.{name}"), value=_require(raw, "value", f"{where}.{name}"), forkable=bool(raw.get("forkable", False)), min=raw.get("min"), max=raw.get("max"), description=str(raw.get("description", "")), ) except ParameterError as exc: raise SourceValidationError(f"{where}: {exc.message}") from exc def _parse_clause(raw: Any, where: str) -> Clause: cid = _require(raw, "id", where) text = _require(raw, "text", f"{where}[{cid}]") if not isinstance(text, str) or not normalize_text(text): raise SourceValidationError(f"{where}[{cid}]: clause text must be non-empty") params_raw = raw.get("parameters") or {} if not isinstance(params_raw, dict): raise SourceValidationError(f"{where}[{cid}]: parameters must be a mapping") params = { name: _parse_parameter(name, spec, f"{where}[{cid}]") for name, spec in params_raw.items() } return Clause( id=str(cid), text=text, title=str(raw.get("title", "")), notes=str(raw.get("notes", "")), parameters=params, ) def _parse_article(raw: Any, layer: str, module: str, where: str) -> Article: aid = str(_require(raw, "id", where)) number = _require(raw, "number", f"{where}[{aid}]") if isinstance(number, bool) or not isinstance(number, int): raise SourceValidationError(f"{where}[{aid}]: article number must be an integer") clauses_raw = _require(raw, "clauses", f"{where}[{aid}]") if not isinstance(clauses_raw, list) or not clauses_raw: raise SourceValidationError(f"{where}[{aid}]: clauses must be a non-empty list") clauses = tuple(_parse_clause(c, f"{where}[{aid}].clauses") for c in clauses_raw) seen: set[str] = set() for clause in clauses: if clause.id in seen: raise SourceValidationError(f"{where}[{aid}]: duplicate clause id {clause.id!r}") seen.add(clause.id) declared_layer = str(raw.get("layer", layer)) if declared_layer != layer: raise SourceValidationError( f"{where}[{aid}]: declares layer {declared_layer!r} but lives in {layer!r}" ) return Article( id=aid, number=number, title=str(raw.get("title", "")), layer=layer, clauses=clauses, module=module, notes=str(raw.get("notes", "")), ) @dataclass(frozen=True) class Constitution: version: str kernel: dict[str, Article] userland: dict[str, Article] invariants: dict[str, Invariant] source_dir: Optional[Path] = None overrides: dict[str, Any] = field(default_factory=dict) # -- loading ----------------------------------------------------------- @classmethod def load(cls, root: Path | str, overrides: dict[str, Any] | None = None) -> "Constitution": root = Path(root) if not root.is_dir(): raise SourceValidationError(f"constitution directory not found: {root}") version_doc = _load_yaml(root / "version.yaml") version = str(_require(version_doc, "version", f"{root / 'version.yaml'}")) _validate_semver_string(version, where=str(root / "version.yaml")) kernel: dict[str, Article] = {} kernel_dir = root / "kernel" if not kernel_dir.is_dir(): raise SourceValidationError(f"missing kernel directory: {kernel_dir}") for path in sorted(kernel_dir.glob("*.yaml")): doc = _load_yaml(path) article = _parse_article(doc, KERNEL_LAYER, module="", where=str(path)) if article.id in kernel: raise SourceValidationError(f"{path}: duplicate article id {article.id!r}") kernel[article.id] = article userland: dict[str, Article] = {} userland_dir = root / "userland" if userland_dir.is_dir(): for path in sorted(userland_dir.glob("*.yaml")): doc = _load_yaml(path) module = str(_require(doc, "module", str(path))) articles_raw = _require(doc, "articles", str(path)) if not isinstance(articles_raw, list) or not articles_raw: raise SourceValidationError(f"{path}: articles must be a non-empty list") for raw in articles_raw: article = _parse_article(raw, USERLAND_LAYER, module=module, where=str(path)) if article.id in userland or article.id in kernel: raise SourceValidationError( f"{path}: duplicate article id {article.id!r}" ) userland[article.id] = article invariants: dict[str, Invariant] = {} inv_path = root / "invariants.yaml" if inv_path.is_file(): inv_doc = _load_yaml(inv_path) for raw in _require(inv_doc, "invariants", str(inv_path)): inv = Invariant( id=str(_require(raw, "id", str(inv_path))), title=str(raw.get("title", "")), statement=str(_require(raw, "statement", str(inv_path))), tests=tuple(str(t) for t in (raw.get("tests") or [])), notes=str(raw.get("notes", "")), ) if inv.id in invariants: raise SourceValidationError(f"{inv_path}: duplicate invariant {inv.id!r}") invariants[inv.id] = inv constitution = cls( version=version, kernel=kernel, userland=userland, invariants=invariants, source_dir=root, overrides=dict(overrides or {}), ) constitution._validate_overrides() constitution._validate_clause_uniqueness() return constitution def _validate_clause_uniqueness(self) -> None: seen: set[str] = set() for article in self.all_articles(): for clause in article.clauses: if clause.id in seen: raise SourceValidationError( f"clause id {clause.id!r} appears in more than one article" ) seen.add(clause.id) def _validate_overrides(self) -> None: for key, raw in self.overrides.items(): clause_id, _, param_name = key.rpartition(".") if not clause_id or not param_name: raise ParameterError( f"override key {key!r} must look like '.'" ) param = self._find_parameter(clause_id, param_name) param.validate_override(raw) # -- access ------------------------------------------------------------ def all_articles(self) -> Iterator[Article]: for article in sorted(self.kernel.values(), key=lambda a: a.number): yield article for article in sorted(self.userland.values(), key=lambda a: (a.module, a.number)): yield article def clause(self, clause_id: str) -> Clause: for article in self.all_articles(): found = article.clause_map().get(clause_id) if found is not None: return found raise SourceValidationError(f"no such clause: {clause_id!r}") def _find_parameter(self, clause_id: str, name: str) -> Parameter: clause = self.clause(clause_id) if name not in clause.parameters: raise ParameterError(f"clause {clause_id!r} has no parameter {name!r}") return clause.parameters[name] def parameter(self, clause_id: str, name: str) -> Any: """The effective (override-aware) parsed value of a parameter.""" param = self._find_parameter(clause_id, name) key = f"{clause_id}.{name}" if key in self.overrides: return param.validate_override(self.overrides[key]) return param.parsed_value() def ratio(self, clause_id: str, name: str) -> Fraction: value = self.parameter(clause_id, name) if not isinstance(value, Fraction): raise ParameterError(f"parameter {clause_id}.{name} is not a ratio") return value def integer(self, clause_id: str, name: str) -> int: value = self.parameter(clause_id, name) if isinstance(value, bool) or not isinstance(value, int): raise ParameterError(f"parameter {clause_id}.{name} is not an integer") return value # -- hashing ----------------------------------------------------------- def normative(self) -> dict: return { "kernel": {aid: a.normative() for aid, a in sorted(self.kernel.items())}, "userland": {aid: a.normative() for aid, a in sorted(self.userland.items())}, "invariants": {iid: i.normative() for iid, i in sorted(self.invariants.items())}, } def normative_hash(self) -> str: return content_hash(self.normative()) def raw_form(self) -> dict: return { "kernel": {aid: a.raw() for aid, a in sorted(self.kernel.items())}, "userland": {aid: a.raw() for aid, a in sorted(self.userland.items())}, "invariants": {iid: i.raw() for iid, i in sorted(self.invariants.items())}, } def _validate_semver_string(version: str, where: str) -> tuple[int, int, int]: parts = version.split(".") if len(parts) != 3 or not all(p.isdigit() for p in parts): raise SourceValidationError( f"{where}: version {version!r} is not MAJOR.MINOR.PATCH" ) return int(parts[0]), int(parts[1]), int(parts[2]) def parse_version(version: str) -> tuple[int, int, int]: return _validate_semver_string(version, where="version") # --------------------------------------------------------------------------- # Diffing # --------------------------------------------------------------------------- class ChangeKind(str, enum.Enum): ARTICLE_ADDED = "article_added" ARTICLE_REMOVED = "article_removed" CLAUSE_ADDED = "clause_added" CLAUSE_REMOVED = "clause_removed" CLAUSE_TEXT_CHANGED = "clause_text_changed" PARAM_ADDED = "param_added" PARAM_REMOVED = "param_removed" PARAM_VALUE_CHANGED = "param_value_changed" PARAM_META_CHANGED = "param_meta_changed" INVARIANT_ADDED = "invariant_added" INVARIANT_REMOVED = "invariant_removed" INVARIANT_CHANGED = "invariant_changed" NON_NORMATIVE_CHANGED = "non_normative_changed" @dataclass(frozen=True) class Change: kind: ChangeKind layer: str # "kernel" | "userland" | "invariants" ref: str # article id, clause id, "clause.param", or invariant id before: Any = None after: Any = None def to_dict(self) -> dict: return { "kind": self.kind.value, "layer": self.layer, "ref": self.ref, "before": self.before, "after": self.after, } @property def normative_change(self) -> bool: return self.kind is not ChangeKind.NON_NORMATIVE_CHANGED def _diff_clause(base: Clause, head: Clause, layer: str, changes: list[Change]) -> None: if normalize_text(base.text) != normalize_text(head.text): changes.append( Change( ChangeKind.CLAUSE_TEXT_CHANGED, layer, base.id, before=normalize_text(base.text), after=normalize_text(head.text), ) ) base_params, head_params = base.parameters, head.parameters for name in sorted(set(base_params) | set(head_params)): ref = f"{base.id}.{name}" if name not in head_params: changes.append( Change(ChangeKind.PARAM_REMOVED, layer, ref, before=base_params[name].normative()) ) elif name not in base_params: changes.append( Change(ChangeKind.PARAM_ADDED, layer, ref, after=head_params[name].normative()) ) else: b, h = base_params[name].normative(), head_params[name].normative() if b["value"] != h["value"]: changes.append( Change( ChangeKind.PARAM_VALUE_CHANGED, layer, ref, before=b["value"], after=h["value"], ) ) meta_b = {k: v for k, v in b.items() if k != "value"} meta_h = {k: v for k, v in h.items() if k != "value"} if meta_b != meta_h: changes.append( Change( ChangeKind.PARAM_META_CHANGED, layer, ref, before=meta_b, after=meta_h ) ) def _diff_articles( base: dict[str, Article], head: dict[str, Article], layer: str, changes: list[Change] ) -> None: for aid in sorted(set(base) | set(head)): if aid not in head: changes.append( Change(ChangeKind.ARTICLE_REMOVED, layer, aid, before=base[aid].normative()) ) continue if aid not in base: changes.append( Change(ChangeKind.ARTICLE_ADDED, layer, aid, after=head[aid].normative()) ) continue base_clauses, head_clauses = base[aid].clause_map(), head[aid].clause_map() for cid in sorted(set(base_clauses) | set(head_clauses)): if cid not in head_clauses: changes.append( Change( ChangeKind.CLAUSE_REMOVED, layer, cid, before=base_clauses[cid].normative(), ) ) elif cid not in base_clauses: changes.append( Change( ChangeKind.CLAUSE_ADDED, layer, cid, after=head_clauses[cid].normative() ) ) else: _diff_clause(base_clauses[cid], head_clauses[cid], layer, changes) def diff_constitutions(base: Constitution, head: Constitution) -> list[Change]: """All changes from ``base`` to ``head``, normative first, then non-normative.""" changes: list[Change] = [] _diff_articles(base.kernel, head.kernel, KERNEL_LAYER, changes) _diff_articles(base.userland, head.userland, USERLAND_LAYER, changes) for iid in sorted(set(base.invariants) | set(head.invariants)): if iid not in head.invariants: changes.append( Change( ChangeKind.INVARIANT_REMOVED, INVARIANTS_LAYER, iid, before=base.invariants[iid].normative(), ) ) elif iid not in base.invariants: changes.append( Change( ChangeKind.INVARIANT_ADDED, INVARIANTS_LAYER, iid, after=head.invariants[iid].normative(), ) ) else: b, h = base.invariants[iid].normative(), head.invariants[iid].normative() if b != h: changes.append( Change(ChangeKind.INVARIANT_CHANGED, INVARIANTS_LAYER, iid, before=b, after=h) ) # Non-normative drift: raw form differs although nothing normative did, # per article / invariant, so titles & notes edits are still visible. if not changes: if content_hash(base.raw_form()) != content_hash(head.raw_form()): changes.append( Change(ChangeKind.NON_NORMATIVE_CHANGED, KERNEL_LAYER, "(text)") ) else: normative_refs = {c.ref for c in changes} for aid in set(base.kernel) & set(head.kernel): if aid in normative_refs: continue if ( content_hash(base.kernel[aid].raw()) != content_hash(head.kernel[aid].raw()) and content_hash(base.kernel[aid].normative()) == content_hash(head.kernel[aid].normative()) ): changes.append(Change(ChangeKind.NON_NORMATIVE_CHANGED, KERNEL_LAYER, aid)) for aid in set(base.userland) & set(head.userland): if aid in normative_refs: continue if ( content_hash(base.userland[aid].raw()) != content_hash(head.userland[aid].raw()) and content_hash(base.userland[aid].normative()) == content_hash(head.userland[aid].normative()) ): changes.append(Change(ChangeKind.NON_NORMATIVE_CHANGED, USERLAND_LAYER, aid)) return changes