"""Fork tooling: clone, parameterize, and track upstream. The right to fork is a kernel article; this module makes it cheap. A fork is a new repository (typically created with ``git clone`` or a GitHub fork) whose constitution descends from an upstream constitution. This module manages the *constitutional* relationship — which is about content, not git history: * :func:`init_fork` copies the upstream constitution into a fresh tree, records the normative hash of every kernel file in a ``fork.yaml`` manifest, blanks the citizen registry (a fork starts with its own citizens, never inherits an electorate), and creates empty ``proposals/`` and ``ballots/`` directories. * :func:`apply_parameters` rewrites userland parameter values, enforcing each parameter's declared bounds/allowed set — so a parameterized fork is by construction a PATCH-class divergence, never a silent rewrite. * :func:`upstream_status` compares three hashes per kernel file (the hash recorded at fork time, the local hash, and the current upstream hash) and reports each file as unchanged, locally modified, updated upstream, diverged, or converged. * :func:`pull_upstream` fast-forwards kernel files the fork has not touched, refuses (by default) to overwrite local kernel changes, and updates the manifest so the next status check starts from the new base. Hashes are *normative* hashes (see :mod:`govtool.classifier`): a fork that only rewrote commentary still tracks upstream cleanly. """ from __future__ import annotations import datetime as _dt import shutil from dataclasses import dataclass, field from pathlib import Path from typing import Any from govtool.classifier import ( INVARIANTS_PATH, KERNEL_PREFIX, check_bounds, normative_hash, param_meta, param_value, ) from govtool.errors import GovtoolError from govtool.yamlio import dump_yaml, load_yaml class ForkError(GovtoolError): """Raised when a fork operation cannot be performed safely.""" FORK_MANIFEST = "fork.yaml" REGISTRY_PATH = "citizens/registry.yaml" VERSION_PATH = "constitution/version.yaml" def _now_iso(now: _dt.datetime | None = None) -> str: now = now or _dt.datetime.now(_dt.timezone.utc) return now.astimezone(_dt.timezone.utc).isoformat() def kernel_files(root: Path | str) -> dict[str, Path]: """Repo-relative path -> absolute path for kernel-zone files.""" root = Path(root) found: dict[str, Path] = {} kernel_dir = root / KERNEL_PREFIX if kernel_dir.is_dir(): for path in sorted(kernel_dir.glob("*.yaml")): found[path.relative_to(root).as_posix()] = path invariants = root / INVARIANTS_PATH if invariants.is_file(): found[INVARIANTS_PATH] = invariants return found def kernel_hashes(root: Path | str) -> dict[str, str]: """Normative hash of every kernel-zone file in a tree.""" return { rel: normative_hash(load_yaml(path)) for rel, path in kernel_files(root).items() } # --------------------------------------------------------------------------- # Manifest # --------------------------------------------------------------------------- @dataclass class ForkManifest: name: str forked_at: str upstream: dict kernel_hashes: dict[str, str] overrides: list[dict] = field(default_factory=list) lineage: list[dict] = field(default_factory=list) def to_dict(self) -> dict: return { "name": self.name, "forked_at": self.forked_at, "upstream": self.upstream, "kernel_hashes": self.kernel_hashes, "overrides": self.overrides, "lineage": self.lineage, } @classmethod def from_dict(cls, data: dict) -> "ForkManifest": if not isinstance(data, dict): raise ForkError("fork manifest is not a mapping") for required in ("name", "upstream", "kernel_hashes"): if required not in data: raise ForkError(f"fork manifest missing '{required}'") return cls( name=str(data["name"]), forked_at=str(data.get("forked_at", "")), upstream=dict(data["upstream"]), kernel_hashes=dict(data["kernel_hashes"]), overrides=list(data.get("overrides", [])), lineage=list(data.get("lineage", [])), ) @classmethod def load(cls, fork_root: Path | str) -> "ForkManifest": return cls.from_dict(load_yaml(Path(fork_root) / FORK_MANIFEST)) def save(self, fork_root: Path | str) -> None: dump_yaml(self.to_dict(), Path(fork_root) / FORK_MANIFEST) # --------------------------------------------------------------------------- # init # --------------------------------------------------------------------------- def init_fork( upstream_root: Path | str, dest_root: Path | str, *, name: str, upstream_url: str = "", now: _dt.datetime | None = None, ) -> ForkManifest: """Create a new fork tree from an upstream checkout. Copies ``constitution/``, snapshots kernel hashes, blanks the citizen registry, and writes ``fork.yaml``. Refuses to overwrite an existing constitution at the destination. """ upstream_root, dest_root = Path(upstream_root), Path(dest_root) if not (upstream_root / "constitution").is_dir(): raise ForkError(f"no constitution found at upstream root {upstream_root}") if (dest_root / "constitution").exists(): raise ForkError(f"destination already contains a constitution: {dest_root}") if (dest_root / FORK_MANIFEST).exists(): raise ForkError(f"destination already contains a fork manifest: {dest_root}") dest_root.mkdir(parents=True, exist_ok=True) shutil.copytree(upstream_root / "constitution", dest_root / "constitution") # Fresh electorate: keep the registry's top-level shape, blank citizens. registry_shape: dict[str, Any] = {"citizens": []} upstream_registry = upstream_root / REGISTRY_PATH if upstream_registry.is_file(): upstream_data = load_yaml(upstream_registry) if isinstance(upstream_data, dict): registry_shape = { key: ([] if key == "citizens" else value) for key, value in upstream_data.items() } registry_shape.setdefault("citizens", []) dump_yaml(registry_shape, dest_root / REGISTRY_PATH) for empty_dir in ("proposals", "ballots", "ledger"): directory = dest_root / empty_dir directory.mkdir(parents=True, exist_ok=True) (directory / ".gitkeep").write_text("", encoding="utf-8") upstream_version = "" version_file = upstream_root / VERSION_PATH if version_file.is_file(): version_data = load_yaml(version_file) if isinstance(version_data, dict): upstream_version = str(version_data.get("version", "")) # Lineage: if upstream is itself a fork, inherit and extend its chain. lineage: list[dict] = [] upstream_manifest_path = upstream_root / FORK_MANIFEST if upstream_manifest_path.is_file(): upstream_manifest = ForkManifest.load(upstream_root) lineage = list(upstream_manifest.lineage) lineage.append( { "name": upstream_manifest.name, "forked_at": upstream_manifest.forked_at, "upstream": upstream_manifest.upstream, } ) manifest = ForkManifest( name=name, forked_at=_now_iso(now), upstream={"url": upstream_url, "version": upstream_version}, kernel_hashes=kernel_hashes(upstream_root), overrides=[], lineage=lineage, ) manifest.save(dest_root) return manifest # --------------------------------------------------------------------------- # parameterize # --------------------------------------------------------------------------- @dataclass class ParameterChange: module: str name: str old: Any new: Any def to_dict(self) -> dict: return {"module": self.module, "name": self.name, "old": self.old, "new": self.new} def apply_parameters( fork_root: Path | str, module: str, overrides: dict[str, Any], *, now: _dt.datetime | None = None, ) -> list[ParameterChange]: """Set userland parameter values in a fork, enforcing declared bounds. ``module`` is the file stem under ``constitution/userland/`` (with or without the ``.yaml`` suffix). Each override must name an existing parameter and satisfy its bounds/allowed constraints; violations raise :class:`ForkError` and nothing is written. Applied changes are appended to the manifest's ``overrides`` log with a timestamp. """ fork_root = Path(fork_root) stem = module[:-5] if module.endswith(".yaml") else module module_path = fork_root / "constitution" / "userland" / f"{stem}.yaml" if not module_path.is_file(): raise ForkError(f"userland module not found: {module_path}") data = load_yaml(module_path) if not isinstance(data, dict): raise ForkError(f"module {stem} is not a mapping") parameters = data.get("parameters") if not isinstance(parameters, dict): raise ForkError(f"module {stem} has no 'parameters' mapping") # Validate everything before writing anything (atomic intent). planned: list[ParameterChange] = [] for parameter_name, new_value in overrides.items(): if parameter_name not in parameters: raise ForkError(f"module {stem} has no parameter '{parameter_name}'") entry = parameters[parameter_name] ok, why = check_bounds(new_value, param_meta(entry)) if not ok: raise ForkError(f"module {stem}, parameter '{parameter_name}': {why}") planned.append( ParameterChange( module=stem, name=parameter_name, old=param_value(entry), new=new_value, ) ) for change in planned: entry = parameters[change.name] if isinstance(entry, dict) and "value" in entry: entry["value"] = change.new else: parameters[change.name] = change.new dump_yaml(data, module_path) manifest_path = fork_root / FORK_MANIFEST if manifest_path.is_file(): manifest = ForkManifest.load(fork_root) manifest.overrides.append( { "applied_at": _now_iso(now), "module": stem, "changes": [change.to_dict() for change in planned], } ) manifest.save(fork_root) return planned # --------------------------------------------------------------------------- # upstream tracking # --------------------------------------------------------------------------- STATE_UNCHANGED = "unchanged" STATE_LOCAL_MODIFIED = "local-modified" STATE_UPSTREAM_UPDATED = "upstream-updated" STATE_DIVERGED = "diverged" STATE_CONVERGED = "converged" STATE_ADDED_UPSTREAM = "added-upstream" STATE_REMOVED_UPSTREAM = "removed-upstream" STATE_REMOVED_LOCAL = "removed-local" @dataclass class FileStatus: path: str state: str base_hash: str | None local_hash: str | None upstream_hash: str | None def to_dict(self) -> dict: return { "path": self.path, "state": self.state, "base_hash": self.base_hash, "local_hash": self.local_hash, "upstream_hash": self.upstream_hash, } def _file_state( base: str | None, local: str | None, upstream: str | None ) -> str: if local is None and upstream is None: return STATE_REMOVED_UPSTREAM # existed only in the manifest snapshot if local is None: return STATE_REMOVED_LOCAL if upstream is None: return STATE_REMOVED_UPSTREAM if local == upstream: return STATE_UNCHANGED if local == base else STATE_CONVERGED if base is None: return STATE_ADDED_UPSTREAM if local is None else STATE_DIVERGED if upstream == base: return STATE_LOCAL_MODIFIED if local == base: return STATE_UPSTREAM_UPDATED return STATE_DIVERGED def upstream_status( fork_root: Path | str, upstream_root: Path | str ) -> list[FileStatus]: """Per-kernel-file drift report between a fork and its upstream.""" fork_root, upstream_root = Path(fork_root), Path(upstream_root) manifest = ForkManifest.load(fork_root) local = kernel_hashes(fork_root) upstream = kernel_hashes(upstream_root) statuses: list[FileStatus] = [] for rel in sorted(set(manifest.kernel_hashes) | set(local) | set(upstream)): base_hash = manifest.kernel_hashes.get(rel) local_hash = local.get(rel) upstream_hash = upstream.get(rel) if base_hash is None and local_hash is None and upstream_hash is not None: state = STATE_ADDED_UPSTREAM else: state = _file_state(base_hash, local_hash, upstream_hash) statuses.append(FileStatus(rel, state, base_hash, local_hash, upstream_hash)) return statuses @dataclass class PullReport: applied: list[str] = field(default_factory=list) conflicts: list[str] = field(default_factory=list) skipped: list[str] = field(default_factory=list) @property def clean(self) -> bool: return not self.conflicts def to_dict(self) -> dict: return { "applied": self.applied, "conflicts": self.conflicts, "skipped": self.skipped, "clean": self.clean, } def pull_upstream( fork_root: Path | str, upstream_root: Path | str, *, take_diverged: bool = False, now: _dt.datetime | None = None, ) -> PullReport: """Fast-forward kernel files the fork has not modified. * ``upstream-updated`` and ``added-upstream`` files are copied in. * ``diverged`` files are reported as conflicts unless ``take_diverged=True`` (explicitly adopt upstream, discarding local kernel edits). * ``local-modified`` files are left alone (the fork's choice stands). * The manifest's base hashes and recorded upstream version are updated, so subsequent status checks measure from the new base. """ fork_root, upstream_root = Path(fork_root), Path(upstream_root) manifest = ForkManifest.load(fork_root) statuses = upstream_status(fork_root, upstream_root) upstream_paths = kernel_files(upstream_root) report = PullReport() for status in statuses: take = status.state in (STATE_UPSTREAM_UPDATED, STATE_ADDED_UPSTREAM) or ( take_diverged and status.state == STATE_DIVERGED ) if take: source = upstream_paths[status.path] destination = fork_root / status.path destination.parent.mkdir(parents=True, exist_ok=True) shutil.copyfile(source, destination) manifest.kernel_hashes[status.path] = status.upstream_hash or "" report.applied.append(status.path) elif status.state == STATE_DIVERGED: report.conflicts.append(status.path) elif status.state in (STATE_UNCHANGED, STATE_CONVERGED): if status.local_hash is not None: manifest.kernel_hashes[status.path] = status.local_hash report.skipped.append(status.path) else: report.skipped.append(status.path) version_file = upstream_root / VERSION_PATH if version_file.is_file(): version_data = load_yaml(version_file) if isinstance(version_data, dict): manifest.upstream["version"] = str(version_data.get("version", "")) manifest.upstream["last_pulled_at"] = _now_iso(now) manifest.save(fork_root) return report