"""Ballots: signed, hash-bound votes. A ballot commits to five things and signs all of them with the citizen's registered Ed25519 key: * ``proposal_id`` — which amendment, * ``voter`` — who, * ``choice`` — yes / no / abstain, * ``cast_at`` — when, * ``change_hash`` — the exact text being voted on. Binding the ballot to ``change_hash`` means nobody can swap the amendment text after votes start landing: a ballot for the old text simply fails verification against the new one. Re-voting is allowed while the window is open; the tally counts only each voter's latest valid ballot (changing your mind is a feature, not an exploit). """ from __future__ import annotations import re from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from . import keys from .canonical import canonical_json, dump_yaml, iso, load_yaml, parse_iso, utcnow from .eligibility import Citizen, check_voter_eligibility from .errors import BallotError, EligibilityError, ProposalStateError from .proposal import OPEN, Proposal, ballots_dir CHOICE_YES = "yes" CHOICE_NO = "no" CHOICE_ABSTAIN = "abstain" VALID_CHOICES = (CHOICE_YES, CHOICE_NO, CHOICE_ABSTAIN) _SIGNED_FIELDS = ("proposal_id", "voter", "choice", "cast_at", "change_hash") @dataclass(frozen=True) class Ballot: proposal_id: str voter: str choice: str cast_at: str # ISO-8601 UTC change_hash: str # binds the vote to the exact amendment text public_key: str # voter's registered public key (convenience copy) signature: str # hex Ed25519 signature over signing_payload() def to_dict(self) -> Dict[str, Any]: return { "proposal_id": self.proposal_id, "voter": self.voter, "choice": self.choice, "cast_at": self.cast_at, "change_hash": self.change_hash, "public_key": self.public_key, "signature": self.signature, } @classmethod def from_dict(cls, raw: Dict[str, Any], source: str = "") -> "Ballot": if not isinstance(raw, dict): raise BallotError(f"{source}: ballot must be a mapping") missing = [f for f in (*_SIGNED_FIELDS, "public_key", "signature") if f not in raw] if missing: raise BallotError(f"{source}: ballot missing fields {missing}") return cls( proposal_id=str(raw["proposal_id"]), voter=str(raw["voter"]), choice=str(raw["choice"]), cast_at=str(raw["cast_at"]), change_hash=str(raw["change_hash"]), public_key=str(raw["public_key"]), signature=str(raw["signature"]), ) def signing_payload(ballot_fields: Dict[str, Any]) -> bytes: """Canonical bytes a ballot signature covers (signed fields only).""" body = {k: ballot_fields[k] for k in _SIGNED_FIELDS} return canonical_json(body).encode("utf-8") def verify_ballot( ballot: Ballot, proposal: Proposal, registry: Dict[str, Citizen], ) -> Tuple[bool, str]: """Full verification of one ballot against a proposal and the registry. Returns ``(ok, reason)``; ``reason`` is ``"ok"`` on success and a precise, audit-ready explanation on failure. Never raises for an invalid ballot — the tally records the reason instead. """ if ballot.proposal_id != proposal.id: return False, f"ballot targets proposal {ballot.proposal_id!r}, not {proposal.id!r}" if ballot.choice not in VALID_CHOICES: return False, f"invalid choice {ballot.choice!r}" if ballot.change_hash != proposal.change_hash: return False, ( "change_hash mismatch: ballot was cast against different amendment text" ) try: cast_at = parse_iso(ballot.cast_at) except Exception as exc: # ValidationError return False, f"unparseable cast_at: {exc}" try: opens, closes = proposal.voting_window() except ProposalStateError: return False, "proposal has no voting window" if cast_at < opens: return False, f"cast at {ballot.cast_at}, before voting opened" if cast_at > closes: return False, f"cast at {ballot.cast_at}, after voting closed" try: citizen = check_voter_eligibility(ballot.voter, registry, opens) except EligibilityError as exc: return False, str(exc) if ballot.public_key.strip().lower() != citizen.public_key: return False, ( f"ballot public key does not match {ballot.voter!r}'s registered key" ) payload = signing_payload(ballot.to_dict()) if not keys.verify(citizen.public_key, payload, ballot.signature): return False, "signature verification failed against the registered key" return True, "ok" # --------------------------------------------------------------------------- # Casting and storage # --------------------------------------------------------------------------- def _ballot_filename(voter: str, cast_at: str) -> str: stamp = re.sub(r"[^0-9TZ]", "", cast_at) return f"{voter}-{stamp}.yaml" def cast_ballot( repo_root: Union[str, Path], proposal: Proposal, voter: str, choice: str, private_key_hex: str, registry: Dict[str, Citizen], now: Optional[datetime] = None, ) -> Ballot: """Construct, sign, verify, and persist a ballot. Performs the same checks the tally will: a ballot that would not count is refused at cast time with the reason, rather than silently accepted and discarded later. """ now = now or utcnow() if proposal.status != OPEN: raise BallotError( f"proposal {proposal.id} is {proposal.status!r}; ballots are only " f"accepted while voting is open" ) if choice not in VALID_CHOICES: raise BallotError(f"invalid choice {choice!r}; allowed: {VALID_CHOICES}") opens, _ = proposal.voting_window() citizen = check_voter_eligibility(voter, registry, opens) derived_public = keys.public_key_from_private(private_key_hex) if derived_public != citizen.public_key: raise BallotError( f"the provided private key does not correspond to {voter!r}'s " f"registered public key" ) fields = { "proposal_id": proposal.id, "voter": voter, "choice": choice, "cast_at": iso(now), "change_hash": proposal.change_hash, } signature = keys.sign(private_key_hex, signing_payload(fields)) ballot = Ballot(public_key=derived_public, signature=signature, **fields) ok, reason = verify_ballot(ballot, proposal, registry) if not ok: raise BallotError(f"refusing to cast invalid ballot: {reason}") path = ballots_dir(repo_root, proposal.id) / _ballot_filename(voter, ballot.cast_at) dump_yaml(path, ballot.to_dict()) return ballot def load_ballots(repo_root: Union[str, Path], proposal_id: str) -> List[Ballot]: """Load all ballots filed for a proposal, sorted by (voter, cast_at). Malformed ballot files raise — a ballot directory that cannot be parsed must fail loudly, not be quietly skipped. """ directory = ballots_dir(repo_root, proposal_id) if not directory.is_dir(): return [] ballots: List[Ballot] = [] for path in sorted(directory.glob("*.yaml")): ballots.append(Ballot.from_dict(load_yaml(path), source=str(path))) ballots.sort(key=lambda b: (b.voter, b.cast_at)) return ballots