"""The vote gate: proposal lifecycle enforcement, tally, and ratification. This is the orchestration layer that CI calls on every amendment PR. It runs a fixed pipeline of stages against a checkout and produces a :class:`GateReport` that is (a) rendered as a PR comment, (b) hashed and bound into the audit ledger, and (c) used to decide whether the PR may merge and what version the constitution becomes. Stages (in order) ================= 1. **schema** — every governance artifact validates against its JSON Schema (see :mod:`govtool.schemas`). 2. **proposal-state** — the proposal is in an open, votable state. 3. **classification** — the semver class computed from the actual diff matches the class the proposal declares, and the PR does not hand-edit ``version.yaml`` (the tooling owns version bumps). 4. **review-period** — the class-specific minimum review window has elapsed since the proposal opened. 5. **ballots** — every ballot is signature-verified against the registry's key for the voter, bound to the proposal's content hash, and deduplicated (latest valid ballot per voter counts). Informational: invalid ballots are excluded and reported, they do not block the gate. 6. **quorum** — turnout among voters eligible *at proposal opening* meets the class threshold (snapshot eligibility prevents packing the electorate mid-vote). 7. **approval** — yes/(yes+no) meets the class threshold; abstentions count toward quorum, not approval. Integration contract ==================== This module deliberately stays thin over the already-shipped primitives. It expects the following signatures from its siblings (if any drift, this header is the single place to reconcile): * ``govtool.ballots.load_ballots(directory: Path) -> list[Ballot]`` where each ``Ballot`` exposes ``proposal_id``, ``voter_id``, ``choice`` ("yes"/"no"/"abstain"), ``cast_at``. * ``govtool.ballots.verify_ballot(ballot, *, proposal_hash: str, public_key: str) -> None`` raising ``GovtoolError`` on any failure. * ``govtool.eligibility.load_registry(path: Path) -> list[Citizen]`` and ``govtool.eligibility.eligible_voters(citizens, as_of) -> list[Citizen]`` where each ``Citizen`` exposes ``id`` and ``public_key``. * ``govtool.tally.run_tally(ballots, eligible_count: int, approval: float, quorum: float) -> TallyResult`` exposing ``yes``, ``no``, ``abstain``, ``quorum_met``, ``approval_met``, ``passed``. """ from __future__ import annotations import datetime as _dt from dataclasses import dataclass, field from fractions import Fraction from pathlib import Path from typing import Any from govtool import schemas from govtool.ballots import load_ballots, verify_ballot from govtool.canonical import content_hash from govtool.classifier import ( BUMP_ORDER, Classification, classify_trees, next_version, ) from govtool.eligibility import eligible_voters, load_registry from govtool.errors import GovtoolError from govtool.tally import run_tally from govtool.yamlio import load_yaml, dump_yaml, to_jsonable class GateError(GovtoolError): """Raised on misuse of the gate API (missing proposal, bad ratify call).""" # Repository layout the gate operates on. PROPOSALS_DIR = "proposals" BALLOTS_DIR = "ballots" REGISTRY_PATH = "citizens/registry.yaml" VERSION_PATH = "constitution/version.yaml" CONSTITUTION_DIR = "constitution" #: Proposal states in which voting is live and the gate may evaluate. OPEN_STATES = ("open", "voting") #: Proposal fields that legitimately mutate after ballots are cast. The #: ballot-binding hash is computed over everything *except* these, so a #: ballot stays valid when the proposal is later marked ratified/rejected #: but is invalidated by any substantive edit to the proposal text. MUTABLE_PROPOSAL_FIELDS = frozenset( { "status", "updated_at", "ratified_at", "ratified_version", "rejected_at", "gate_report", } ) # --------------------------------------------------------------------------- # Voting rules (read from the kernel, with documented fallbacks) # --------------------------------------------------------------------------- @dataclass class ClassRules: """Thresholds applicable to one change class.""" approval: float quorum: float review_hours: int def to_dict(self) -> dict: return { "approval": self.approval, "quorum": self.quorum, "review_hours": self.review_hours, } #: Constitutional defaults, used only when the kernel text cannot be #: machine-read. These mirror the kernel v0.1 ratification article: #: kernel (major) changes need a 2/3 supermajority at 50% quorum after a #: 7-day review; minor changes a simple majority at 1/3 quorum after #: 3 days; patches a simple majority at 20% quorum after 1 day. DEFAULT_RULES: dict[str, ClassRules] = { "major": ClassRules(approval=2 / 3, quorum=0.5, review_hours=168), "minor": ClassRules(approval=0.5, quorum=1 / 3, review_hours=72), "patch": ClassRules(approval=0.5, quorum=0.2, review_hours=24), "none": ClassRules(approval=0.5, quorum=0.2, review_hours=24), } _APPROVAL_ALIASES = ("approval", "threshold", "supermajority", "yes_fraction") _QUORUM_ALIASES = ("quorum", "turnout", "participation") _REVIEW_ALIASES = ("review_period_hours", "review_hours", "review_period", "review") @dataclass class VotingRules: rules: dict[str, ClassRules] source: str def for_class(self, bump: str) -> ClassRules: if bump not in self.rules: raise GateError(f"no voting rules for change class {bump!r}") return self.rules[bump] def to_dict(self) -> dict: return { "source": self.source, "rules": {k: v.to_dict() for k, v in self.rules.items()}, } def _as_fraction(value: Any) -> float | None: """Parse a threshold: 0.667, 66.7 (percent), '2/3', '66.7%'.""" if isinstance(value, bool) or value is None: return None if isinstance(value, (int, float)): number = float(value) return number / 100.0 if number > 1.0 else number if isinstance(value, str): text = value.strip().rstrip("%").strip() try: if "/" in text: return float(Fraction(text)) number = float(text) return number / 100.0 if number > 1.0 else number except (ValueError, ZeroDivisionError): return None return None def _as_hours(value: Any) -> int | None: """Parse a review period: 168, '168h', '7d'.""" if isinstance(value, bool) or value is None: return None if isinstance(value, (int, float)): return int(value) if isinstance(value, str): text = value.strip().lower() try: if text.endswith("d"): return int(float(text[:-1]) * 24) if text.endswith("h"): return int(float(text[:-1])) return int(float(text)) except ValueError: return None return None def _find_class_table(node: Any) -> dict | None: """Recursively locate a mapping keyed by all three change classes.""" if isinstance(node, dict): if {"major", "minor", "patch"} <= set(node.keys()) and all( isinstance(node[key], dict) for key in ("major", "minor", "patch") ): return node for value in node.values(): found = _find_class_table(value) if found is not None: return found elif isinstance(node, list): for item in node: found = _find_class_table(item) if found is not None: return found return None def load_voting_rules(constitution_dir: Path | str) -> VotingRules: """Read class thresholds from the kernel articles. Scans every kernel article for a table keyed by change class (the ratification article carries one); merges whatever fields it finds over the constitutional defaults so partial tables still work. The rules used are always reported in the gate output, so a mismatch between text and tooling is visible, not silent. """ constitution_dir = Path(constitution_dir) kernel_dir = constitution_dir / "kernel" table: dict | None = None source = "defaults (kernel table not machine-readable)" if kernel_dir.is_dir(): for article_path in sorted(kernel_dir.glob("*.yaml")): try: data = load_yaml(article_path) except GovtoolError: continue found = _find_class_table(data) if found is not None: table = found source = article_path.name break rules: dict[str, ClassRules] = {} for bump, default in DEFAULT_RULES.items(): approval, quorum, review = default.approval, default.quorum, default.review_hours if table is not None and isinstance(table.get(bump), dict): entry = table[bump] for alias in _APPROVAL_ALIASES: parsed = _as_fraction(entry.get(alias)) if parsed is not None: approval = parsed break for alias in _QUORUM_ALIASES: parsed = _as_fraction(entry.get(alias)) if parsed is not None: quorum = parsed break for alias in _REVIEW_ALIASES: parsed = _as_hours(entry.get(alias)) if parsed is not None: review = parsed break rules[bump] = ClassRules(approval=approval, quorum=quorum, review_hours=review) return VotingRules(rules=rules, source=source) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now_utc() -> _dt.datetime: return _dt.datetime.now(_dt.timezone.utc) def _parse_dt(value: Any) -> _dt.datetime | None: if value is None: return None if isinstance(value, _dt.datetime): return value if value.tzinfo else value.replace(tzinfo=_dt.timezone.utc) if isinstance(value, _dt.date): return _dt.datetime(value.year, value.month, value.day, tzinfo=_dt.timezone.utc) if isinstance(value, str): text = value.strip().replace("Z", "+00:00") try: parsed = _dt.datetime.fromisoformat(text) except ValueError: return None return parsed if parsed.tzinfo else parsed.replace(tzinfo=_dt.timezone.utc) return None def _iso(value: _dt.datetime | None) -> str | None: return value.astimezone(_dt.timezone.utc).isoformat() if value else None def proposal_binding_hash(raw_proposal: dict) -> str: """Hash a proposal's immutable content; ballots commit to this hash.""" stable = { key: value for key, value in raw_proposal.items() if key not in MUTABLE_PROPOSAL_FIELDS } return content_hash(to_jsonable(stable)) def _tally_dict(result: Any) -> dict: """Serialise a TallyResult resiliently.""" if hasattr(result, "to_dict"): return to_jsonable(result.to_dict()) try: import dataclasses if dataclasses.is_dataclass(result): return to_jsonable(dataclasses.asdict(result)) except Exception: # pragma: no cover - defensive pass return to_jsonable( {k: v for k, v in vars(result).items() if not k.startswith("_")} ) def _read_current_version(repo_root: Path) -> str: data = load_yaml(repo_root / VERSION_PATH) if not isinstance(data, dict) or "version" not in data: raise GateError(f"{VERSION_PATH} missing 'version' field") return str(data["version"]) def _file_text_equal(a: Path, b: Path) -> bool: a_exists, b_exists = a.is_file(), b.is_file() if not a_exists and not b_exists: return True if a_exists != b_exists: return False return a.read_bytes() == b.read_bytes() # --------------------------------------------------------------------------- # Report structures # --------------------------------------------------------------------------- @dataclass class Stage: name: str passed: bool fatal: bool = True message: str = "" details: dict = field(default_factory=dict) def to_dict(self) -> dict: return { "name": self.name, "passed": self.passed, "fatal": self.fatal, "message": self.message, "details": to_jsonable(self.details), } @dataclass class GateReport: proposal_id: str generated_at: str current_version: str bump: str next_version: str passed: bool rules: dict stages: list[Stage] = field(default_factory=list) tally: dict | None = None accepted_ballots: list[dict] = field(default_factory=list) rejected_ballots: list[dict] = field(default_factory=list) classification: dict | None = None def to_dict(self) -> dict: return { "proposal_id": self.proposal_id, "generated_at": self.generated_at, "current_version": self.current_version, "change_class": self.bump, "next_version": self.next_version, "passed": self.passed, "rules": self.rules, "stages": [stage.to_dict() for stage in self.stages], "tally": self.tally, "accepted_ballots": self.accepted_ballots, "rejected_ballots": self.rejected_ballots, "classification": self.classification, } @property def report_hash(self) -> str: """Canonical hash binding this report into the audit ledger.""" return content_hash(to_jsonable(self.to_dict())) def audit_events(self) -> list[dict]: """Audit-ledger events describing this gate run.""" return [ { "type": "gate.completed", "payload": { "proposal_id": self.proposal_id, "passed": self.passed, "change_class": self.bump, "current_version": self.current_version, "next_version": self.next_version, "tally": self.tally, "report_hash": self.report_hash, "generated_at": self.generated_at, }, } ] def to_markdown(self) -> str: verdict = "✅ **GATE PASSED**" if self.passed else "❌ **GATE FAILED**" lines = [ f"## Vote gate — proposal `{self.proposal_id}`", "", verdict, "", f"- Change class: `{self.bump}`", f"- Constitution: `{self.current_version}` → " + (f"`{self.next_version}`" if self.passed else "_(unchanged)_"), f"- Report hash: `{self.report_hash}`", "", "| Stage | Result | Detail |", "|---|---|---|", ] for stage in self.stages: mark = "✅" if stage.passed else ("❌" if stage.fatal else "⚠️") lines.append(f"| {stage.name} | {mark} | {stage.message} |") if self.tally: lines += [ "", "### Tally", "", "| Yes | No | Abstain | Eligible | Quorum | Approval |", "|---|---|---|---|---|---|", "| {yes} | {no} | {abstain} | {eligible} | {quorum} | {approval} |".format( yes=self.tally.get("yes", "?"), no=self.tally.get("no", "?"), abstain=self.tally.get("abstain", "?"), eligible=self.tally.get("eligible", self.tally.get("eligible_count", "?")), quorum="met" if self.tally.get("quorum_met") else "NOT met", approval="met" if self.tally.get("approval_met") else "NOT met", ), ] if self.rejected_ballots: lines += ["", "### Rejected ballots", ""] for rejected in self.rejected_ballots: lines.append( f"- `{rejected.get('voter', '?')}`: {rejected.get('reason', '?')}" ) return "\n".join(lines) # --------------------------------------------------------------------------- # The gate itself # --------------------------------------------------------------------------- def run_gate( repo_root: Path | str, proposal_id: str, base_root: Path | str | None = None, now: _dt.datetime | None = None, ) -> GateReport: """Run the full vote-gate pipeline for one proposal. ``base_root`` should be a checkout of the merge base (current law); when omitted (local dry runs), the classification stage trusts the proposal's declared ``change_class`` and is marked non-fatal. """ repo_root = Path(repo_root) now = now or _now_utc() stages: list[Stage] = [] proposal_path = repo_root / PROPOSALS_DIR / f"{proposal_id}.yaml" raw = load_yaml(proposal_path) if not isinstance(raw, dict): raise GateError(f"proposal {proposal_id} is not a mapping: {proposal_path}") current_version = _read_current_version(repo_root) voting_rules = load_voting_rules(repo_root / CONSTITUTION_DIR) # -- stage 1: schema ----------------------------------------------------- schema_report = schemas.validate_repo(repo_root) stages.append( Stage( name="schema", passed=schema_report.ok, message=( f"{len(schema_report.checked)} file(s) validated" if schema_report.ok else f"{len(schema_report.errors)} schema error(s)" ), details={"errors": [issue.to_dict() for issue in schema_report.errors[:50]]}, ) ) # -- stage 2: proposal state ---------------------------------------------- status = str(raw.get("status", "draft")) stages.append( Stage( name="proposal-state", passed=status in OPEN_STATES, message=f"status is '{status}'" + ("" if status in OPEN_STATES else f"; must be one of {list(OPEN_STATES)}"), ) ) # -- stage 3: classification ---------------------------------------------- declared = raw.get("change_class") classification: Classification | None = None if base_root is not None: base_root = Path(base_root) classification = classify_trees(base_root, repo_root) bump = classification.bump class_ok = declared is None or declared == bump version_untouched = _file_text_equal( base_root / VERSION_PATH, repo_root / VERSION_PATH ) message_parts = [f"computed class '{bump}'"] if declared is not None: message_parts.append( f"declared '{declared}' ({'match' if class_ok else 'MISMATCH'})" ) if not version_untouched: message_parts.append("version.yaml was hand-edited (forbidden)") stages.append( Stage( name="classification", passed=class_ok and version_untouched, message="; ".join(message_parts), details=classification.to_dict(), ) ) else: bump = str(declared) if declared in BUMP_ORDER else "minor" stages.append( Stage( name="classification", passed=True, fatal=False, message=f"no base tree provided; trusting declared change_class '{bump}'", ) ) class_rules = voting_rules.for_class(bump) # -- stage 4: review period ------------------------------------------------- opened = _parse_dt(raw.get("opened_at")) or _parse_dt(raw.get("created_at")) if opened is None: stages.append( Stage( name="review-period", passed=False, message="proposal has no parseable 'opened_at'/'created_at' timestamp", ) ) else: elapsed_hours = (now - opened).total_seconds() / 3600.0 ok = elapsed_hours >= class_rules.review_hours stages.append( Stage( name="review-period", passed=ok, message=( f"{elapsed_hours:.1f}h elapsed of required " f"{class_rules.review_hours}h for class '{bump}'" ), details={"opened_at": _iso(opened), "now": _iso(now)}, ) ) # -- stage 5: ballots --------------------------------------------------------- registry = load_registry(repo_root / REGISTRY_PATH) snapshot = opened or now eligible = list(eligible_voters(registry, snapshot)) eligible_by_id = {citizen.id: citizen for citizen in eligible} binding_hash = proposal_binding_hash(raw) ballots_dir = repo_root / BALLOTS_DIR / proposal_id loaded = load_ballots(ballots_dir) if ballots_dir.is_dir() else [] accepted_by_voter: dict[str, Any] = {} accepted_meta: dict[str, dict] = {} rejected: list[dict] = [] superseded = 0 def _reject(ballot: Any, reason: str) -> None: rejected.append( { "voter": getattr(ballot, "voter_id", "?"), "choice": getattr(ballot, "choice", "?"), "cast_at": _iso(_parse_dt(getattr(ballot, "cast_at", None))), "reason": reason, } ) ordered = sorted( loaded, key=lambda ballot: _parse_dt(getattr(ballot, "cast_at", None)) or _dt.datetime.min.replace(tzinfo=_dt.timezone.utc), ) for ballot in ordered: voter_id = getattr(ballot, "voter_id", None) if getattr(ballot, "proposal_id", None) != proposal_id: _reject(ballot, "ballot references a different proposal") continue if voter_id not in eligible_by_id: _reject(ballot, "voter not in eligibility snapshot at proposal opening") continue try: verify_ballot( ballot, proposal_hash=binding_hash, public_key=eligible_by_id[voter_id].public_key, ) except GovtoolError as exc: _reject(ballot, f"verification failed: {exc}") continue if voter_id in accepted_by_voter: superseded += 1 accepted_by_voter[voter_id] = ballot accepted_meta[voter_id] = { "voter": voter_id, "choice": getattr(ballot, "choice", "?"), "cast_at": _iso(_parse_dt(getattr(ballot, "cast_at", None))), } accepted = list(accepted_by_voter.values()) stages.append( Stage( name="ballots", passed=True, fatal=False, message=( f"{len(accepted)} accepted, {len(rejected)} rejected, " f"{superseded} superseded duplicate(s); " f"electorate snapshot at {_iso(snapshot)} ({len(eligible)} eligible)" ), details={"binding_hash": binding_hash}, ) ) # -- stages 6 & 7: quorum and approval (delegated to tally) -------------------- result = run_tally( accepted, eligible_count=len(eligible), approval=class_rules.approval, quorum=class_rules.quorum, ) tally = _tally_dict(result) tally.setdefault("eligible", len(eligible)) stages.append( Stage( name="quorum", passed=bool(getattr(result, "quorum_met")), message=( f"{len(accepted)}/{len(eligible)} voted; " f"required quorum {class_rules.quorum:.0%} for class '{bump}'" ), ) ) yes = int(getattr(result, "yes", 0)) no = int(getattr(result, "no", 0)) decided = yes + no approval_share = (yes / decided) if decided else 0.0 stages.append( Stage( name="approval", passed=bool(getattr(result, "approval_met")), message=( f"yes={yes}, no={no} ({approval_share:.1%} of decided votes); " f"required > {class_rules.approval:.1%} for class '{bump}'" ), ) ) passed = all(stage.passed for stage in stages if stage.fatal) report = GateReport( proposal_id=proposal_id, generated_at=_iso(now) or "", current_version=current_version, bump=bump, next_version=next_version(current_version, bump), passed=passed, rules={"class": bump, **class_rules.to_dict(), "source": voting_rules.source}, stages=stages, tally=tally, accepted_ballots=[accepted_meta[v] for v in sorted(accepted_meta)], rejected_ballots=rejected, classification=classification.to_dict() if classification else None, ) return report # --------------------------------------------------------------------------- # Ratification / rejection (applied on merge) # --------------------------------------------------------------------------- def ratify( repo_root: Path | str, report: GateReport, now: _dt.datetime | None = None, ) -> dict: """Apply a passing gate report: bump the version, close the proposal. Returns the audit-ledger event describing the ratification. Raises :class:`GateError` if the report did not pass — ratification is mechanically impossible without a passing gate. """ if not report.passed: raise GateError( f"refusing to ratify {report.proposal_id}: gate report did not pass" ) repo_root = Path(repo_root) now = now or _now_utc() version_path = repo_root / VERSION_PATH data = load_yaml(version_path) if not isinstance(data, dict): raise GateError(f"{VERSION_PATH} is not a mapping") old_version = str(data.get("version", report.current_version)) if old_version != report.current_version: raise GateError( f"version drift: report was computed against {report.current_version} " f"but tree is at {old_version}; re-run the gate" ) data["version"] = report.next_version history = data.setdefault("history", []) if not isinstance(history, list): raise GateError(f"{VERSION_PATH} 'history' is not a list") history.append( { "version": report.next_version, "proposal": report.proposal_id, "ratified_at": _iso(now), "change_class": report.bump, "gate_report": report.report_hash, } ) dump_yaml(data, version_path) proposal_path = repo_root / PROPOSALS_DIR / f"{report.proposal_id}.yaml" proposal = load_yaml(proposal_path) proposal["status"] = "ratified" proposal["ratified_at"] = _iso(now) proposal["ratified_version"] = report.next_version proposal["gate_report"] = report.report_hash dump_yaml(proposal, proposal_path) return { "type": "amendment.ratified", "payload": { "proposal_id": report.proposal_id, "old_version": old_version, "new_version": report.next_version, "change_class": report.bump, "tally": report.tally, "gate_report": report.report_hash, "ratified_at": _iso(now), }, } def reject( repo_root: Path | str, report: GateReport, now: _dt.datetime | None = None, ) -> dict: """Record a failed gate outcome: close the proposal as rejected.""" if report.passed: raise GateError( f"refusing to reject {report.proposal_id}: gate report passed" ) repo_root = Path(repo_root) now = now or _now_utc() proposal_path = repo_root / PROPOSALS_DIR / f"{report.proposal_id}.yaml" proposal = load_yaml(proposal_path) proposal["status"] = "rejected" proposal["rejected_at"] = _iso(now) proposal["gate_report"] = report.report_hash dump_yaml(proposal, proposal_path) failed = [stage.name for stage in report.stages if stage.fatal and not stage.passed] return { "type": "amendment.rejected", "payload": { "proposal_id": report.proposal_id, "failed_stages": failed, "tally": report.tally, "gate_report": report.report_hash, "rejected_at": _iso(now), }, }