"""FablePool ``govtool`` — the command surface for governance-as-code. This module wires the govtool library (ledger, ballots, tally, eligibility, classifier, gate, fork, proposal lifecycle) into a single CLI used by humans, by the demo, and by the GitHub Actions workflows. Command map ----------- govtool keygen generate an ed25519 keypair for a citizen govtool citizen add|list manage the citizen registry (citizens/registry.yaml) govtool proposal new|open|close|transition|show|list|detect amendment proposal lifecycle govtool vote cast|verify|list signed ballots bound to the audit ledger govtool tally convenience tally (the gate runs the authoritative one) govtool classify governance-semver classifier for changed paths govtool gate run the full vote gate for a proposal (CI entry point) govtool ratify gate + version bump + status flip + ledger entry govtool ledger verify|show audit-ledger integrity and inspection govtool fork init|status clone/parameterize a constitution, track upstream govtool version tool + constitution versions Exit codes ---------- 0 success / gate passed 1 gate failed, verification failed, or check not met 2 usage error or governance rule violation (GovtoolError) All state-changing commands append an entry to the public audit ledger (``ledger/ledger.jsonl``) per kernel Article 8: every vote and every amendment lands in the same ledger as the money. """ from __future__ import annotations import argparse import dataclasses import hashlib import json import os import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple import yaml from govtool import ( ballots, classifier, eligibility, fork, gate, keys, ledger, proposal, tally, ) from govtool.errors import GovtoolError try: # version of the tooling itself from govtool import __version__ as GOVTOOL_VERSION except ImportError: # pragma: no cover - defensive GOVTOOL_VERSION = "0.0.0" # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- CHOICES = ("yes", "no", "abstain") LEVELS = ("major", "minor", "patch") PROPOSAL_STATUSES = ("draft", "open", "closed", "ratified", "rejected", "withdrawn") #: Fallback voting rules, mirroring the kernel v0.1 defaults in #: constitution/kernel/article-05-voting.yaml. The authoritative numbers are #: always read from the constitution itself when present; these exist so the #: CLI degrades safely on a fork that strips the voting article down. DEFAULT_RULES: Dict[str, Dict[str, float]] = { "major": {"quorum": 0.5, "threshold": 2.0 / 3.0}, "minor": {"quorum": 0.34, "threshold": 0.5}, "patch": {"quorum": 0.25, "threshold": 0.5}, } _PROPOSAL_PATH_RE = re.compile(r"^proposals/([^/]+)/") _PROPOSAL_FILE_RE = re.compile(r"^proposals/([^/]+)\.ya?ml$") # --------------------------------------------------------------------------- # Generic helpers # --------------------------------------------------------------------------- def _now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def to_plain(obj: Any) -> Any: """Convert dataclasses / NamedTuples / objects into plain JSON-able data.""" if obj is None or isinstance(obj, (str, int, float, bool)): return obj if isinstance(obj, Path): return str(obj) if isinstance(obj, dict): return {str(k): to_plain(v) for k, v in obj.items()} if isinstance(obj, (list, tuple, set)): return [to_plain(v) for v in obj] if dataclasses.is_dataclass(obj) and not isinstance(obj, type): return to_plain(dataclasses.asdict(obj)) if hasattr(obj, "_asdict"): # NamedTuple return to_plain(obj._asdict()) if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")): return to_plain(obj.to_dict()) if hasattr(obj, "__dict__"): return to_plain(vars(obj)) return str(obj) def _get(mapping: Any, *names: str, default: Any = None) -> Any: """Fetch the first present, non-None key from a plain dict.""" if not isinstance(mapping, dict): return default for name in names: if name in mapping and mapping[name] is not None: return mapping[name] return default def _digest(obj: Any) -> str: """sha256 over a canonical JSON serialization of *obj* (sorted keys).""" blob = json.dumps(to_plain(obj), sort_keys=True, separators=(",", ":")) return hashlib.sha256(blob.encode("utf-8")).hexdigest() def _fraction(value: Any, default: float) -> float: """Parse ``0.5`` / ``"0.5"`` / ``"2/3"`` into a float.""" if value is None: return default if isinstance(value, (int, float)): return float(value) text = str(value).strip() if "/" in text: num, den = text.split("/", 1) return float(num) / float(den) return float(text) def _bump(version: str, level: str) -> str: core = re.split(r"[-+]", str(version).strip().lstrip("v"), maxsplit=1)[0] parts = core.split(".") while len(parts) < 3: parts.append("0") try: major, minor, patch = (int(p) for p in parts[:3]) except ValueError as exc: raise GovtoolError(f"cannot parse semver version {version!r}") from exc if level == "major": return f"{major + 1}.0.0" if level == "minor": return f"{major}.{minor + 1}.0" return f"{major}.{minor}.{patch + 1}" def _read_yaml(path: Path) -> Any: if not Path(path).exists(): raise GovtoolError(f"file not found: {path}") with open(path, "r", encoding="utf-8") as handle: return yaml.safe_load(handle) def _write_yaml(obj: Any, path: Path) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as handle: yaml.safe_dump(obj, handle, sort_keys=False, allow_unicode=True, default_flow_style=False) def _print_json(obj: Any) -> None: print(json.dumps(to_plain(obj), indent=2, sort_keys=True)) # --------------------------------------------------------------------------- # Repository layout # --------------------------------------------------------------------------- class RepoPaths: """Well-known paths inside a governance repository.""" def __init__(self, root: str | Path): self.root = Path(root).resolve() self.constitution = self.root / "constitution" self.kernel = self.constitution / "kernel" self.version_file = self.constitution / "version.yaml" self.registry_file = self.root / "citizens" / "registry.yaml" self.proposals_dir = self.root / "proposals" self.votes_dir = self.root / "votes" self.ledger_file = self.root / "ledger" / "ledger.jsonl" def _open_ledger(paths: RepoPaths) -> "ledger.Ledger": paths.ledger_file.parent.mkdir(parents=True, exist_ok=True) return ledger.Ledger(paths.ledger_file) def _proposal_file(paths: RepoPaths, proposal_id: str) -> Path: """Locate the on-disk proposal document, tolerating both layouts.""" candidates = [ paths.proposals_dir / proposal_id / "proposal.yaml", paths.proposals_dir / f"{proposal_id}.yaml", paths.proposals_dir / proposal_id / f"{proposal_id}.yaml", ] for candidate in candidates: if candidate.exists(): return candidate return candidates[0] def _ballot_files(paths: RepoPaths, proposal_id: str) -> List[Path]: directory = paths.votes_dir / proposal_id if not directory.is_dir(): return [] return sorted(p for p in directory.iterdir() if p.suffix in (".yaml", ".yml") and p.is_file()) # --------------------------------------------------------------------------- # Registry helpers # --------------------------------------------------------------------------- def _load_registry(paths: RepoPaths, create: bool = False) -> Dict[str, Any]: if not paths.registry_file.exists(): if create: return {"citizens": []} raise GovtoolError(f"citizen registry not found at {paths.registry_file}") doc = _read_yaml(paths.registry_file) if doc is None: return {"citizens": []} if isinstance(doc, list): return {"citizens": doc} if isinstance(doc, dict): doc.setdefault("citizens", []) return doc raise GovtoolError(f"unrecognized registry format in {paths.registry_file}") def _citizens(registry: Dict[str, Any]) -> List[Dict[str, Any]]: return [c for c in registry.get("citizens", []) if isinstance(c, dict)] def _find_citizen(registry: Dict[str, Any], citizen_id: str) -> Optional[Dict[str, Any]]: for citizen in _citizens(registry): if str(_get(citizen, "id", "citizen_id")) == str(citizen_id): return citizen return None def _citizen_public_key(citizen: Dict[str, Any]) -> Optional[str]: value = _get(citizen, "public_key", "pubkey", "key") return str(value) if value else None def _is_active(citizen: Dict[str, Any]) -> bool: return str(citizen.get("status", "active")).lower() == "active" # --------------------------------------------------------------------------- # Keys, versions, voting rules # --------------------------------------------------------------------------- def _keypair_parts(keypair: Any) -> Tuple[str, str]: """Accept a KeyPair object, dict, or ``(private, public)`` pair.""" priv = pub = None if isinstance(keypair, dict): priv = keypair.get("private_key") or keypair.get("private") or keypair.get("secret_key") pub = keypair.get("public_key") or keypair.get("public") or keypair.get("verify_key") else: priv = (getattr(keypair, "private_key", None) or getattr(keypair, "private", None) or getattr(keypair, "secret_key", None)) pub = (getattr(keypair, "public_key", None) or getattr(keypair, "public", None) or getattr(keypair, "verify_key", None)) if (priv is None or pub is None) and isinstance(keypair, (tuple, list)) and len(keypair) == 2: priv, pub = keypair if not priv or not pub: raise GovtoolError("could not extract private/public parts from generated keypair") return str(priv), str(pub) def _read_version(paths: RepoPaths) -> Tuple[Dict[str, Any], str, str]: """Return ``(document, version_key, version_string)`` from version.yaml.""" doc = _read_yaml(paths.version_file) if isinstance(doc, str): return {"version": doc}, "version", doc if isinstance(doc, dict): for key in ("version", "constitution_version", "semver"): if key in doc and doc[key] is not None: return doc, key, str(doc[key]) raise GovtoolError(f"could not find a semver field in {paths.version_file}") def _find_level_rules(node: Any, level: str) -> Optional[Dict[str, float]]: """Recursively locate per-level quorum/threshold inside the voting article.""" if isinstance(node, dict): candidate = node.get(level) if isinstance(candidate, dict) and "quorum" in candidate and "threshold" in candidate: return { "quorum": _fraction(candidate["quorum"], DEFAULT_RULES[level]["quorum"]), "threshold": _fraction(candidate["threshold"], DEFAULT_RULES[level]["threshold"]), } for child in node.values(): found = _find_level_rules(child, level) if found: return found elif isinstance(node, list): for child in node: found = _find_level_rules(child, level) if found: return found return None def _voting_rules(paths: RepoPaths, level: str) -> Dict[str, float]: if level not in LEVELS: raise GovtoolError(f"unknown change level {level!r} (expected one of {LEVELS})") article = paths.kernel / "article-05-voting.yaml" if article.exists(): doc = _read_yaml(article) found = _find_level_rules(doc, level) if found: return found return dict(DEFAULT_RULES[level]) # --------------------------------------------------------------------------- # Changed-path collection (classifier / proposal detection) # --------------------------------------------------------------------------- def _git_changed_paths(root: Path, base: str, head: str) -> List[str]: try: result = subprocess.run( ["git", "diff", "--name-only", f"{base}...{head}"], cwd=str(root), capture_output=True, text=True, check=True, ) except FileNotFoundError as exc: raise GovtoolError("git is not available on PATH") from exc except subprocess.CalledProcessError as exc: raise GovtoolError(f"git diff failed: {exc.stderr.strip()}") from exc return [line.strip() for line in result.stdout.splitlines() if line.strip()] def _collect_paths(args: argparse.Namespace, root: Path) -> List[str]: collected: List[str] = [] if getattr(args, "paths", None): collected.extend(args.paths) if getattr(args, "paths_file", None): text = Path(args.paths_file).read_text(encoding="utf-8") collected.extend(line.strip() for line in text.splitlines() if line.strip()) if getattr(args, "base", None): head = getattr(args, "head", None) or "HEAD" collected.extend(_git_changed_paths(root, args.base, head)) # de-duplicate, preserve order seen: Set[str] = set() unique = [] for path in collected: normalized = path.replace("\\", "/") if normalized not in seen: seen.add(normalized) unique.append(normalized) return unique def _detect_proposal_ids(changed: Iterable[str]) -> List[str]: found: List[str] = [] for path in changed: normalized = path.replace("\\", "/") match = _PROPOSAL_PATH_RE.match(normalized) or _PROPOSAL_FILE_RE.match(normalized) if match: pid = match.group(1) if pid not in found: found.append(pid) return found # --------------------------------------------------------------------------- # Commands: keys & citizens # --------------------------------------------------------------------------- def cmd_keygen(args: argparse.Namespace) -> int: keypair = keys.generate_keypair() priv, pub = _keypair_parts(keypair) out = Path(args.out) data = { "key_type": "ed25519", "created": _now(), "public_key": pub, "private_key": priv, } _write_yaml(data, out) try: os.chmod(out, 0o600) except OSError: # pragma: no cover - e.g. some filesystems on Windows pass if args.json: _print_json({"path": str(out), "public_key": pub}) else: print(f"wrote keypair to {out}") print(f"public key: {pub}") return 0 def cmd_citizen_add(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) public_key = args.public_key if not public_key and args.key_file: keydoc = _read_yaml(Path(args.key_file)) if not isinstance(keydoc, dict) or not keydoc.get("public_key"): raise GovtoolError(f"no public_key found in key file {args.key_file}") public_key = str(keydoc["public_key"]) if not public_key: raise GovtoolError("provide --public-key or --key-file") registry = _load_registry(paths, create=True) if _find_citizen(registry, args.id): raise GovtoolError(f"citizen {args.id!r} already exists in the registry") for existing in _citizens(registry): if _citizen_public_key(existing) == public_key: raise GovtoolError( f"public key already registered to citizen " f"{_get(existing, 'id', 'citizen_id')!r} (one person, one vote)" ) citizen = { "id": args.id, "name": args.name or args.id, "public_key": public_key, "status": "active", "joined": _now(), } registry["citizens"].append(citizen) _write_yaml(registry, paths.registry_file) _open_ledger(paths).append("citizen.added", { "citizen_id": args.id, "public_key": public_key, }) if args.json: _print_json(citizen) else: print(f"added citizen {args.id!r} ({len(registry['citizens'])} citizens total)") return 0 def cmd_citizen_list(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) registry = _load_registry(paths) rows = _citizens(registry) if args.json: _print_json(rows) return 0 if not rows: print("(no citizens registered)") return 0 width = max(len(str(_get(c, "id", "citizen_id", default=""))) for c in rows) for citizen in rows: cid = str(_get(citizen, "id", "citizen_id", default="?")) status = str(citizen.get("status", "active")) name = str(citizen.get("name", "")) print(f" {cid:<{width}} {status:<9} {name}") print(f"{len(rows)} citizen(s); " f"{sum(1 for c in rows if _is_active(c))} active") return 0 # --------------------------------------------------------------------------- # Commands: proposal lifecycle # --------------------------------------------------------------------------- def cmd_proposal_new(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) if not re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9._-]*", args.id): raise GovtoolError( f"proposal id {args.id!r} must be alphanumeric with ._- separators") proposal.create_proposal( paths.root, args.id, title=args.title, author=args.author, level=args.level, summary=args.summary, ) if args.change: # Record the declared change set directly on the proposal document so # the classifier and the gate can label it without a live git diff. document_path = _proposal_file(paths, args.id) document = _read_yaml(document_path) or {} if isinstance(document, dict): document["changes"] = list(args.change) _write_yaml(document, document_path) _open_ledger(paths).append("proposal.created", { "proposal_id": args.id, "title": args.title, "author": args.author, "level": args.level, }) print(f"created proposal {args.id!r} (status: draft)") return 0 def _transition(args: argparse.Namespace, status: str) -> int: if status not in PROPOSAL_STATUSES: raise GovtoolError(f"unknown proposal status {status!r}") paths = RepoPaths(args.repo) proposal.set_status(paths.root, args.id, status) _open_ledger(paths).append(f"proposal.{status}", {"proposal_id": args.id}) print(f"proposal {args.id!r} -> {status}") return 0 def cmd_proposal_open(args: argparse.Namespace) -> int: return _transition(args, "open") def cmd_proposal_close(args: argparse.Namespace) -> int: return _transition(args, "closed") def cmd_proposal_transition(args: argparse.Namespace) -> int: return _transition(args, args.to) def cmd_proposal_show(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) document = to_plain(proposal.load_proposal(paths.root, args.id)) if args.field: value = document.get(args.field, "") if isinstance(document, dict) else "" if isinstance(value, (dict, list)): print(json.dumps(value, sort_keys=True)) else: print(value) return 0 _print_json(document) return 0 def cmd_proposal_list(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) if not paths.proposals_dir.is_dir(): print("(no proposals)") return 0 entries = [] for child in sorted(paths.proposals_dir.iterdir()): if child.is_dir(): document_path = _proposal_file(paths, child.name) pid = child.name elif child.suffix in (".yaml", ".yml"): document_path = child pid = child.stem else: continue if not document_path.exists(): continue document = _read_yaml(document_path) or {} entries.append({ "id": pid, "status": _get(document, "status", "state", default="?"), "level": _get(document, "level", "classification", "kind", default="?"), "title": _get(document, "title", default=""), }) if args.json: _print_json(entries) return 0 if not entries: print("(no proposals)") return 0 for entry in entries: print(f" {entry['id']:<40} {entry['status']:<10} {entry['level']:<7} {entry['title']}") return 0 def cmd_proposal_detect(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) changed = _collect_paths(args, paths.root) for pid in _detect_proposal_ids(changed): print(pid) return 0 # --------------------------------------------------------------------------- # Commands: votes # --------------------------------------------------------------------------- def cmd_vote_cast(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) choice = args.choice.lower() if choice not in CHOICES: raise GovtoolError(f"choice must be one of {CHOICES}, got {args.choice!r}") keydoc = _read_yaml(Path(args.key)) if not isinstance(keydoc, dict) or not keydoc.get("private_key"): raise GovtoolError(f"no private_key found in key file {args.key}") registry = _load_registry(paths) citizen = _find_citizen(registry, args.citizen) if citizen is None: raise GovtoolError(f"citizen {args.citizen!r} is not in the registry") if not _is_active(citizen): raise GovtoolError( f"citizen {args.citizen!r} has status " f"{citizen.get('status')!r} and may not vote") registered = _citizen_public_key(citizen) if registered and keydoc.get("public_key") and registered != str(keydoc["public_key"]): raise GovtoolError( f"key file public key does not match the registry entry for {args.citizen!r}") ballot = ballots.create_ballot( proposal_id=args.proposal, citizen_id=args.citizen, choice=choice, private_key=str(keydoc["private_key"]), ) plain_ballot = to_plain(ballot) ballot_file = paths.votes_dir / args.proposal / f"{args.citizen}.yaml" _write_yaml(plain_ballot, ballot_file) _open_ledger(paths).append("vote.cast", { "proposal_id": args.proposal, "citizen_id": args.citizen, "choice": choice, "ballot_hash": _digest(plain_ballot), }) print(f"ballot recorded: {args.citizen} -> {choice} on {args.proposal}") return 0 def cmd_vote_verify(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) registry = _load_registry(paths) files = _ballot_files(paths, args.proposal) if not files: print(f"no ballots found for proposal {args.proposal!r}") return 0 failures = 0 for ballot_file in files: ballot = _read_yaml(ballot_file) cid = str(_get(ballot, "citizen_id", "citizen", default=ballot_file.stem)) citizen = _find_citizen(registry, cid) if citizen is None: print(f" [FAIL] {ballot_file.name}: signer {cid!r} not in registry") failures += 1 continue public_key = _citizen_public_key(citizen) if not public_key: print(f" [FAIL] {ballot_file.name}: no public key on file for {cid!r}") failures += 1 continue try: ok = ballots.verify_ballot(ballot, public_key) except GovtoolError as exc: ok = False print(f" [FAIL] {ballot_file.name}: {exc}") failures += 1 continue if ok is False: print(f" [FAIL] {ballot_file.name}: signature invalid") failures += 1 else: print(f" [ OK ] {ballot_file.name}: signature valid for {cid}") print(f"{len(files)} ballot(s) checked, {failures} failure(s)") return 1 if failures else 0 def cmd_vote_list(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) files = _ballot_files(paths, args.proposal) rows = [] for ballot_file in files: ballot = _read_yaml(ballot_file) rows.append({ "citizen_id": _get(ballot, "citizen_id", "citizen", default=ballot_file.stem), "choice": _get(ballot, "choice", "vote", default="?"), "cast_at": _get(ballot, "timestamp", "cast_at", "created", default=""), }) if args.json: _print_json(rows) return 0 if not rows: print(f"no ballots for {args.proposal!r}") return 0 for row in rows: print(f" {row['citizen_id']:<24} {row['choice']:<8} {row['cast_at']}") return 0 # --------------------------------------------------------------------------- # Commands: tally / classify / gate / ratify # --------------------------------------------------------------------------- def cmd_tally(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) document = to_plain(proposal.load_proposal(paths.root, args.proposal)) level = args.level or _get(document, "level", "classification", "kind", default="minor") if level not in LEVELS: level = "minor" registry = _load_registry(paths) voters = eligibility.eligible_voters(registry) eligible_ids = { str(_get(to_plain(v), "id", "citizen_id")) for v in voters if _get(to_plain(v), "id", "citizen_id") is not None } ballot_docs = [_read_yaml(f) for f in _ballot_files(paths, args.proposal)] rules = _voting_rules(paths, level) result = tally.tally_ballots(ballot_docs, eligible_ids, rules) plain = to_plain(result) if args.json: _print_json(plain) else: print(f"proposal: {args.proposal} (level: {level})") print(f"eligible: {len(eligible_ids)}") print(f"yes: {_get(plain, 'yes', 'yes_votes', default=0)}") print(f"no: {_get(plain, 'no', 'no_votes', default=0)}") print(f"abstain: {_get(plain, 'abstain', 'abstentions', default=0)}") print(f"quorum: required {rules['quorum']:.4f}, " f"met: {bool(_get(plain, 'quorum_met', default=False))}") print(f"threshold: required {rules['threshold']:.4f}, " f"met: {bool(_get(plain, 'threshold_met', default=False))}") print(f"passed: {bool(_get(plain, 'passed', 'accepted', default=False))}") passed = bool(_get(plain, "passed", "accepted", "ok", default=False)) if args.check and not passed: return 1 return 0 def cmd_classify(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) changed = _collect_paths(args, paths.root) if not changed: result: Dict[str, Any] = {"level": "patch", "reasons": ["no changed files"]} else: result = to_plain(classifier.classify_paths(changed)) if not isinstance(result, dict): result = {"level": str(result), "reasons": []} level = str(_get(result, "level", "classification", default="patch")) if args.level_only: print(level) return 0 if args.json: payload = dict(result) payload.setdefault("level", level) payload["label"] = f"governance:{level}" _print_json(payload) return 0 print(f"classification: {level}") for reason in result.get("reasons", []) or []: print(f" - {reason}") print(f"suggested label: governance:{level}") return 0 def _render_gate(plain: Dict[str, Any]) -> bool: checks = plain.get("checks") or [] if isinstance(checks, list) and checks: for check in checks: check = to_plain(check) name = _get(check, "name", "id", "check", default="check") ok = bool(_get(check, "passed", "ok", "success", default=False)) detail = _get(check, "detail", "message", "reason", default="") print(f" [{'PASS' if ok else 'FAIL'}] {name}: {detail}") else: # Unknown result shape: dump it so nothing is hidden from the record. _print_json(plain) return bool(_get(plain, "passed", "ok", "success", default=False)) def cmd_gate(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) result = gate.run_gate(paths.root, args.proposal) plain = to_plain(result) print(f"vote gate: proposal {args.proposal}") passed = _render_gate(plain) print(f"verdict: {'PASSED' if passed else 'FAILED'}") if args.record: summary_checks = [] for check in plain.get("checks") or []: check = to_plain(check) summary_checks.append({ "name": _get(check, "name", "id", "check", default="check"), "passed": bool(_get(check, "passed", "ok", "success", default=False)), }) _open_ledger(paths).append("gate.result", { "proposal_id": args.proposal, "passed": passed, "checks": summary_checks, "result_hash": _digest(plain), }) return 0 if passed else 1 def cmd_ratify(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) result = gate.run_gate(paths.root, args.proposal) plain = to_plain(result) passed = bool(_get(plain, "passed", "ok", "success", default=False)) if not passed: print("vote gate FAILED — refusing to ratify:", file=sys.stderr) _render_gate(plain) return 1 document = to_plain(proposal.load_proposal(paths.root, args.proposal)) level = _get(document, "level", "classification", "kind", default="minor") if level not in LEVELS: level = "minor" version_doc, version_key, old_version = _read_version(paths) new_version = _bump(old_version, level) version_doc[version_key] = new_version for timestamp_key in ("updated", "released", "last_amended"): if timestamp_key in version_doc: version_doc[timestamp_key] = _now() history = version_doc.get("history") if isinstance(history, list): history.append({ "version": new_version, "proposal": args.proposal, "level": level, "date": _now(), }) _write_yaml(version_doc, paths.version_file) proposal.set_status(paths.root, args.proposal, "ratified") _open_ledger(paths).append("amendment.ratified", { "proposal_id": args.proposal, "level": level, "from_version": old_version, "to_version": new_version, "gate_result_hash": _digest(plain), }) print(f"ratified {args.proposal}: constitution {old_version} -> {new_version} ({level})") return 0 # --------------------------------------------------------------------------- # Commands: ledger # --------------------------------------------------------------------------- def cmd_ledger_verify(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) if not paths.ledger_file.exists(): print(f"no ledger at {paths.ledger_file} (nothing to verify)") return 0 led = _open_ledger(paths) try: ok = led.verify() except GovtoolError as exc: print(f"ledger verification FAILED: {exc}", file=sys.stderr) return 1 if ok is False: print("ledger verification FAILED: hash chain broken", file=sys.stderr) return 1 print(f"ledger OK: {paths.ledger_file}") return 0 def cmd_ledger_show(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) if not paths.ledger_file.exists(): print(f"no ledger at {paths.ledger_file}") return 0 led = _open_ledger(paths) entries = [to_plain(entry) for entry in led.entries()] if args.last: entries = entries[-args.last:] for entry in entries: print(json.dumps(entry, sort_keys=True)) return 0 # --------------------------------------------------------------------------- # Commands: fork / version # --------------------------------------------------------------------------- def _parse_params(items: Optional[List[str]]) -> Dict[str, Any]: params: Dict[str, Any] = {} for item in items or []: key, sep, value = item.partition("=") if not sep or not key: raise GovtoolError(f"--param must be KEY=VALUE, got {item!r}") params[key.strip()] = yaml.safe_load(value) return params def cmd_fork_init(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) source = Path(args.source).resolve() if args.source else paths.root dest = Path(args.dest).resolve() params = _parse_params(args.param) manifest = fork.init_fork(source, dest, args.name, params) if args.json: _print_json(manifest) else: print(f"forked constitution {source} -> {dest} as {args.name!r}") _print_json(manifest) return 0 def cmd_fork_status(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) upstream = Path(args.upstream).resolve() if args.upstream else None status = fork.fork_status(paths.root, upstream) _print_json(status) return 0 def cmd_version(args: argparse.Namespace) -> int: paths = RepoPaths(args.repo) constitution_version = None if paths.version_file.exists(): try: _, _, constitution_version = _read_version(paths) except GovtoolError: constitution_version = None if args.json: _print_json({ "govtool": GOVTOOL_VERSION, "constitution": constitution_version, }) else: print(f"govtool {GOVTOOL_VERSION}") print(f"constitution {constitution_version or '(not found)'}") return 0 # --------------------------------------------------------------------------- # Parser # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: common = argparse.ArgumentParser(add_help=False) common.add_argument("--repo", default=".", help="governance repository root (default: current directory)") common.add_argument("--json", action="store_true", help="emit machine-readable JSON output") parser = argparse.ArgumentParser( prog="govtool", description="FablePool governance-as-code tooling: amendments as pull " "requests, ratification as a vote gate in CI.") parser.add_argument("--version", action="version", version=f"govtool {GOVTOOL_VERSION}") sub = parser.add_subparsers(dest="command", required=True) # keygen keygen = sub.add_parser("keygen", parents=[common], help="generate an ed25519 keypair for a citizen") keygen.add_argument("--out", required=True, help="output key file (YAML)") keygen.set_defaults(func=cmd_keygen) # citizen citizen = sub.add_parser("citizen", help="manage the citizen registry") citizen_sub = citizen.add_subparsers(dest="citizen_command", required=True) citizen_add = citizen_sub.add_parser("add", parents=[common], help="register a citizen") citizen_add.add_argument("--id", required=True) citizen_add.add_argument("--name") citizen_add.add_argument("--public-key", dest="public_key") citizen_add.add_argument("--key-file", dest="key_file", help="read the public key from a govtool keygen file") citizen_add.set_defaults(func=cmd_citizen_add) citizen_list = citizen_sub.add_parser("list", parents=[common], help="list registered citizens") citizen_list.set_defaults(func=cmd_citizen_list) # proposal prop = sub.add_parser("proposal", help="amendment proposal lifecycle") prop_sub = prop.add_subparsers(dest="proposal_command", required=True) prop_new = prop_sub.add_parser("new", parents=[common], help="create a proposal") prop_new.add_argument("--id", required=True) prop_new.add_argument("--title", required=True) prop_new.add_argument("--author", required=True) prop_new.add_argument("--level", required=True, choices=LEVELS) prop_new.add_argument("--summary", required=True) prop_new.add_argument("--change", action="append", help="path the proposal changes (repeatable)") prop_new.set_defaults(func=cmd_proposal_new) for name, handler, help_text in ( ("open", cmd_proposal_open, "open the voting window"), ("close", cmd_proposal_close, "close the voting window"), ): leaf = prop_sub.add_parser(name, parents=[common], help=help_text) leaf.add_argument("--id", required=True) leaf.set_defaults(func=handler) prop_transition = prop_sub.add_parser("transition", parents=[common], help="move a proposal to an explicit status") prop_transition.add_argument("--id", required=True) prop_transition.add_argument("--to", required=True, choices=PROPOSAL_STATUSES) prop_transition.set_defaults(func=cmd_proposal_transition) prop_show = prop_sub.add_parser("show", parents=[common], help="show a proposal") prop_show.add_argument("--id", required=True) prop_show.add_argument("--field", help="print a single field (e.g. status)") prop_show.set_defaults(func=cmd_proposal_show) prop_list = prop_sub.add_parser("list", parents=[common], help="list proposals") prop_list.set_defaults(func=cmd_proposal_list) prop_detect = prop_sub.add_parser( "detect", parents=[common], help="print proposal ids touched by a change set (used by CI)") prop_detect.add_argument("--paths", nargs="*") prop_detect.add_argument("--paths-file", dest="paths_file") prop_detect.add_argument("--base", help="git base ref (uses git diff base...head)") prop_detect.add_argument("--head", default="HEAD") prop_detect.set_defaults(func=cmd_proposal_detect) # vote vote = sub.add_parser("vote", help="cast and verify signed ballots") vote_sub = vote.add_subparsers(dest="vote_command", required=True) vote_cast = vote_sub.add_parser("cast", parents=[common], help="cast a ballot") vote_cast.add_argument("--proposal", required=True) vote_cast.add_argument("--citizen", required=True) vote_cast.add_argument("--choice", required=True) vote_cast.add_argument("--key", required=True, help="citizen key file from keygen") vote_cast.set_defaults(func=cmd_vote_cast) vote_verify = vote_sub.add_parser("verify", parents=[common], help="verify all ballot signatures") vote_verify.add_argument("--proposal", required=True) vote_verify.set_defaults(func=cmd_vote_verify) vote_list = vote_sub.add_parser("list", parents=[common], help="list ballots") vote_list.add_argument("--proposal", required=True) vote_list.set_defaults(func=cmd_vote_list) # tally tally_cmd = sub.add_parser("tally", parents=[common], help="tally ballots for a proposal") tally_cmd.add_argument("--proposal", required=True) tally_cmd.add_argument("--level", choices=LEVELS, help="override the change level used for rules") tally_cmd.add_argument("--check", action="store_true", help="exit nonzero unless the tally passes") tally_cmd.set_defaults(func=cmd_tally) # classify classify = sub.add_parser("classify", parents=[common], help="governance-semver classification of a change set") classify.add_argument("--paths", nargs="*") classify.add_argument("--paths-file", dest="paths_file") classify.add_argument("--base", help="git base ref (uses git diff base...head)") classify.add_argument("--head", default="HEAD") classify.add_argument("--level-only", dest="level_only", action="store_true", help="print only the level (for shell scripting)") classify.set_defaults(func=cmd_classify) # gate gate_cmd = sub.add_parser("gate", parents=[common], help="run the full vote gate (CI entry point)") gate_cmd.add_argument("--proposal", required=True) gate_cmd.add_argument("--record", action="store_true", help="append the gate result to the audit ledger") gate_cmd.set_defaults(func=cmd_gate) # ratify ratify = sub.add_parser("ratify", parents=[common], help="ratify a proposal that passed the gate") ratify.add_argument("--proposal", required=True) ratify.set_defaults(func=cmd_ratify) # ledger ledger_cmd = sub.add_parser("ledger", help="audit ledger operations") ledger_sub = ledger_cmd.add_subparsers(dest="ledger_command", required=True) ledger_verify = ledger_sub.add_parser("verify", parents=[common], help="verify the hash chain") ledger_verify.set_defaults(func=cmd_ledger_verify) ledger_show = ledger_sub.add_parser("show", parents=[common], help="print ledger entries as JSON lines") ledger_show.add_argument("--last", type=int, default=0, help="only print the last N entries") ledger_show.set_defaults(func=cmd_ledger_show) # fork fork_cmd = sub.add_parser("fork", help="fork and parameterize a constitution") fork_sub = fork_cmd.add_subparsers(dest="fork_command", required=True) fork_init = fork_sub.add_parser("init", parents=[common], help="create a fork") fork_init.add_argument("--dest", required=True, help="destination directory") fork_init.add_argument("--name", required=True, help="name of the forked polity") fork_init.add_argument("--source", help="source repo (default: --repo)") fork_init.add_argument("--param", action="append", help="userland parameter override KEY=VALUE (repeatable)") fork_init.set_defaults(func=cmd_fork_init) fork_st = fork_sub.add_parser("status", parents=[common], help="compare a fork against its upstream") fork_st.add_argument("--upstream", help="path to the upstream repository") fork_st.set_defaults(func=cmd_fork_status) # version version_cmd = sub.add_parser("version", parents=[common], help="print tool and constitution versions") version_cmd.set_defaults(func=cmd_version) return parser def main(argv: Optional[Sequence[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: return int(args.func(args) or 0) except GovtoolError as exc: print(f"govtool: error: {exc}", file=sys.stderr) return 2 except KeyboardInterrupt: # pragma: no cover return 130 if __name__ == "__main__": # pragma: no cover sys.exit(main())