"""Proposal model and lifecycle. A proposal is an amendment expressed as a machine-applicable change set: a list of constitution files to add, modify, or delete, each carrying the complete new file content. Full-content changes (rather than unified diffs) make the proposal self-contained, trivially hashable, and impossible to misapply against a drifted base. Lifecycle:: draft ──► open ──► closed ──► ratified │ │ └────► rejected └─────────┴───────────────► withdrawn Every ballot binds to ``change_hash`` — the hash of the exact change set — so the text cannot be swapped after votes are cast. Every ratification checks ``base_version`` against the live constitution so a stale proposal cannot silently merge over newer amendments. """ from __future__ import annotations import re from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Union from .canonical import ( dump_yaml, hash_obj, iso, load_yaml, parse_iso, tree_hash, utcnow, ) from .errors import ProposalError, ProposalStateError PROPOSALS_DIRNAME = "proposals" CONSTITUTION_DIRNAME = "constitution" # Lifecycle states DRAFT = "draft" OPEN = "open" CLOSED = "closed" RATIFIED = "ratified" REJECTED = "rejected" WITHDRAWN = "withdrawn" ALL_STATUSES = (DRAFT, OPEN, CLOSED, RATIFIED, REJECTED, WITHDRAWN) ALLOWED_TRANSITIONS: Dict[str, tuple] = { DRAFT: (OPEN, WITHDRAWN), OPEN: (CLOSED, WITHDRAWN), CLOSED: (RATIFIED, REJECTED), RATIFIED: (), REJECTED: (), WITHDRAWN: (), } ACTION_ADD = "add" ACTION_MODIFY = "modify" ACTION_DELETE = "delete" VALID_ACTIONS = (ACTION_ADD, ACTION_MODIFY, ACTION_DELETE) _ID_RE = re.compile(r"^[0-9]{4}-[a-z0-9][a-z0-9-]*$") DEFAULT_VOTING_PERIOD = timedelta(days=7) # --------------------------------------------------------------------------- # Change set # --------------------------------------------------------------------------- @dataclass(frozen=True) class FileChange: """One file-level change inside an amendment.""" path: str # POSIX path relative to repo root, under constitution/ action: str # add | modify | delete content: Optional[str] = None # full new file text; None only for delete def validate(self) -> None: if self.action not in VALID_ACTIONS: raise ProposalError(f"change to {self.path!r}: invalid action {self.action!r}") p = self.path if not isinstance(p, str) or not p: raise ProposalError("change path must be a non-empty string") if "\\" in p or p.startswith("/") or ".." in Path(p).parts: raise ProposalError(f"change path {p!r} must be a safe relative POSIX path") if not p.startswith(CONSTITUTION_DIRNAME + "/"): raise ProposalError( f"change path {p!r}: amendments may only touch files under " f"'{CONSTITUTION_DIRNAME}/'" ) if self.action == ACTION_DELETE: if self.content is not None: raise ProposalError(f"delete of {p!r} must not carry content") else: if not isinstance(self.content, str) or not self.content.strip(): raise ProposalError( f"{self.action} of {p!r} must carry the complete new file content" ) def to_dict(self) -> Dict[str, Any]: d: Dict[str, Any] = {"path": self.path, "action": self.action} if self.content is not None: d["content"] = self.content return d @classmethod def from_dict(cls, raw: Dict[str, Any]) -> "FileChange": if not isinstance(raw, dict): raise ProposalError("each change must be a mapping") change = cls( path=raw.get("path", ""), action=raw.get("action", ""), content=raw.get("content"), ) change.validate() return change def changes_hash(changes: List[FileChange]) -> str: """Deterministic hash of a change set (order-independent).""" canon = sorted((c.to_dict() for c in changes), key=lambda d: (d["path"], d["action"])) return hash_obj(canon) # --------------------------------------------------------------------------- # Proposal # --------------------------------------------------------------------------- @dataclass class Proposal: id: str title: str summary: str author: str created_at: str base_version: str base_constitution_hash: str changes: List[FileChange] = field(default_factory=list) status: str = DRAFT voting_opens_at: Optional[str] = None voting_closes_at: Optional[str] = None acknowledges_invariant_change: bool = False classification: Optional[Dict[str, Any]] = None # -- derived ------------------------------------------------------------ @property def change_hash(self) -> str: return changes_hash(self.changes) def voting_window(self) -> tuple[datetime, datetime]: if not self.voting_opens_at or not self.voting_closes_at: raise ProposalStateError(f"proposal {self.id}: voting window is not set") opens = parse_iso(self.voting_opens_at) closes = parse_iso(self.voting_closes_at) if closes <= opens: raise ProposalError(f"proposal {self.id}: voting window closes before it opens") return opens, closes # -- validation ----------------------------------------------------------- def validate(self) -> None: if not _ID_RE.match(self.id or ""): raise ProposalError( f"proposal id {self.id!r} must match NNNN-kebab-slug (e.g. 0001-fix-quorum)" ) for fname, value in (("title", self.title), ("summary", self.summary), ("author", self.author)): if not isinstance(value, str) or not value.strip(): raise ProposalError(f"proposal {self.id}: {fname!r} must be a non-empty string") parse_iso(self.created_at) if self.status not in ALL_STATUSES: raise ProposalError(f"proposal {self.id}: invalid status {self.status!r}") if not self.changes: raise ProposalError(f"proposal {self.id}: change set is empty") seen_paths = set() for change in self.changes: change.validate() if change.path in seen_paths: raise ProposalError( f"proposal {self.id}: duplicate change for path {change.path!r}" ) seen_paths.add(change.path) if self.voting_opens_at or self.voting_closes_at: self.voting_window() # -- serialization -------------------------------------------------------- def to_dict(self) -> Dict[str, Any]: d: Dict[str, Any] = { "id": self.id, "title": self.title, "summary": self.summary, "author": self.author, "created_at": self.created_at, "status": self.status, "base_version": self.base_version, "base_constitution_hash": self.base_constitution_hash, "acknowledges_invariant_change": self.acknowledges_invariant_change, "voting_opens_at": self.voting_opens_at, "voting_closes_at": self.voting_closes_at, "changes": [c.to_dict() for c in self.changes], "change_hash": self.change_hash, } if self.classification is not None: d["classification"] = self.classification return d @classmethod def from_dict(cls, raw: Dict[str, Any], source: str = "") -> "Proposal": if not isinstance(raw, dict): raise ProposalError(f"{source}: proposal document must be a mapping") try: changes = [FileChange.from_dict(c) for c in raw.get("changes", [])] proposal = cls( id=raw["id"], title=raw["title"], summary=raw["summary"], author=raw["author"], created_at=raw["created_at"], base_version=raw["base_version"], base_constitution_hash=raw["base_constitution_hash"], changes=changes, status=raw.get("status", DRAFT), voting_opens_at=raw.get("voting_opens_at"), voting_closes_at=raw.get("voting_closes_at"), acknowledges_invariant_change=bool( raw.get("acknowledges_invariant_change", False) ), classification=raw.get("classification"), ) except KeyError as exc: raise ProposalError(f"{source}: proposal missing required field {exc}") from exc proposal.validate() recorded = raw.get("change_hash") if recorded is not None and recorded != proposal.change_hash: raise ProposalError( f"{source}: recorded change_hash does not match the change set — " f"the proposal text was altered after hashing" ) return proposal # --------------------------------------------------------------------------- # Repository I/O # --------------------------------------------------------------------------- def proposal_dir(repo_root: Union[str, Path], proposal_id: str) -> Path: return Path(repo_root) / PROPOSALS_DIRNAME / proposal_id def proposal_file(repo_root: Union[str, Path], proposal_id: str) -> Path: return proposal_dir(repo_root, proposal_id) / "proposal.yaml" def ballots_dir(repo_root: Union[str, Path], proposal_id: str) -> Path: return proposal_dir(repo_root, proposal_id) / "ballots" def save_proposal(repo_root: Union[str, Path], proposal: Proposal) -> Path: proposal.validate() path = proposal_file(repo_root, proposal.id) dump_yaml(path, proposal.to_dict()) return path def load_proposal(repo_root: Union[str, Path], proposal_id: str) -> Proposal: path = proposal_file(repo_root, proposal_id) if not path.exists(): raise ProposalError(f"proposal {proposal_id!r} not found at {path}") return Proposal.from_dict(load_yaml(path), source=str(path)) def list_proposals(repo_root: Union[str, Path]) -> List[str]: root = Path(repo_root) / PROPOSALS_DIRNAME if not root.is_dir(): return [] return sorted( d.name for d in root.iterdir() if d.is_dir() and (d / "proposal.yaml").exists() ) # --------------------------------------------------------------------------- # Lifecycle operations # --------------------------------------------------------------------------- def new_proposal( repo_root: Union[str, Path], proposal_id: str, title: str, summary: str, author: str, changes: List[FileChange], acknowledges_invariant_change: bool = False, now: Optional[datetime] = None, ) -> Proposal: """Create a draft proposal pinned to the current constitution state.""" repo_root = Path(repo_root) if proposal_file(repo_root, proposal_id).exists(): raise ProposalError(f"proposal {proposal_id!r} already exists") constitution_dir = repo_root / CONSTITUTION_DIRNAME version_doc = load_yaml(constitution_dir / "version.yaml") base_version = str(version_doc.get("version", "0.0.0")) now = now or utcnow() proposal = Proposal( id=proposal_id, title=title, summary=summary, author=author, created_at=iso(now), base_version=base_version, base_constitution_hash=tree_hash(constitution_dir), changes=list(changes), acknowledges_invariant_change=acknowledges_invariant_change, ) proposal.validate() _check_changes_apply(repo_root, proposal) return proposal def transition(proposal: Proposal, new_status: str) -> None: """Move a proposal to *new_status*, enforcing the lifecycle graph.""" if new_status not in ALL_STATUSES: raise ProposalStateError(f"unknown status {new_status!r}") allowed = ALLOWED_TRANSITIONS[proposal.status] if new_status not in allowed: raise ProposalStateError( f"proposal {proposal.id}: illegal transition {proposal.status} → {new_status}; " f"allowed from {proposal.status}: {list(allowed) or 'none (terminal state)'}" ) proposal.status = new_status def open_voting( proposal: Proposal, now: Optional[datetime] = None, voting_period: timedelta = DEFAULT_VOTING_PERIOD, closes_at: Optional[datetime] = None, ) -> None: """Open the voting window. The electorate freezes at this instant.""" now = now or utcnow() if proposal.status != DRAFT: raise ProposalStateError( f"proposal {proposal.id}: can only open voting from draft, " f"current status is {proposal.status!r}" ) opens = now closes = closes_at or (opens + voting_period) if closes <= opens: raise ProposalError(f"proposal {proposal.id}: voting must close after it opens") proposal.voting_opens_at = iso(opens) proposal.voting_closes_at = iso(closes) transition(proposal, OPEN) def close_voting(proposal: Proposal, now: Optional[datetime] = None) -> None: """Close the voting window. Refuses to close early.""" now = now or utcnow() if proposal.status != OPEN: raise ProposalStateError( f"proposal {proposal.id}: cannot close voting from status {proposal.status!r}" ) _, closes = proposal.voting_window() if now < closes: raise ProposalStateError( f"proposal {proposal.id}: voting closes at {proposal.voting_closes_at}; " f"refusing to close early — early closure is a classic capture move" ) transition(proposal, CLOSED) # --------------------------------------------------------------------------- # Applying changes (ratification merge) # --------------------------------------------------------------------------- def _check_changes_apply(repo_root: Path, proposal: Proposal) -> None: """Verify every change is applicable against the working tree.""" for change in proposal.changes: target = Path(repo_root) / change.path if change.action == ACTION_ADD and target.exists(): raise ProposalError( f"proposal {proposal.id}: cannot add {change.path!r}, file already exists" ) if change.action in (ACTION_MODIFY, ACTION_DELETE) and not target.exists(): raise ProposalError( f"proposal {proposal.id}: cannot {change.action} {change.path!r}, " f"file does not exist" ) def apply_changes(repo_root: Union[str, Path], proposal: Proposal) -> List[str]: """Apply the proposal's change set to the working tree. Refuses to apply if the live constitution version no longer matches the proposal's ``base_version`` — a stale amendment must be rebased and re-voted, never silently merged over newer law. Returns the list of paths written or deleted. """ repo_root = Path(repo_root) version_doc = load_yaml(repo_root / CONSTITUTION_DIRNAME / "version.yaml") live_version = str(version_doc.get("version", "0.0.0")) if live_version != proposal.base_version: raise ProposalError( f"proposal {proposal.id} was drafted against constitution " f"{proposal.base_version} but the live version is {live_version}; " f"rebase the proposal and re-open the vote" ) _check_changes_apply(repo_root, proposal) touched: List[str] = [] for change in proposal.changes: target = repo_root / change.path if change.action == ACTION_DELETE: target.unlink() else: target.parent.mkdir(parents=True, exist_ok=True) assert change.content is not None # validated upstream target.write_text(change.content, encoding="utf-8") touched.append(change.path) return touched