"""Citizen registry and voter eligibility. Article 2 (citizenship) and Article 5 (voting) in code: * One person, one vote. No capital weighting anywhere in this module — there is deliberately no field for stake, balance, or weight. * Only citizens with status ``active`` may vote. * The eligible electorate for a proposal is frozen at the moment voting opens: citizens who joined after a vote opened cannot vote on it. This closes the "mint sock-puppet citizens mid-vote" exploit. """ from __future__ import annotations from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, Union from .canonical import load_yaml, parse_iso from .errors import EligibilityError, RegistryError from .keys import PUBLIC_KEY_HEX_LEN DEFAULT_REGISTRY_RELPATH = Path("citizens") / "registry.yaml" STATUS_ACTIVE = "active" STATUS_SUSPENDED = "suspended" STATUS_DEPARTED = "departed" VALID_STATUSES = (STATUS_ACTIVE, STATUS_SUSPENDED, STATUS_DEPARTED) @dataclass(frozen=True) class Citizen: """One citizen: an identity, a public key, a join date, a status.""" id: str name: str public_key: str joined_at: str # ISO-8601 UTC status: str = STATUS_ACTIVE def joined_at_dt(self) -> datetime: return parse_iso(self.joined_at) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "public_key": self.public_key, "joined_at": self.joined_at, "status": self.status, } def registry_path(repo_root: Union[str, Path]) -> Path: return Path(repo_root) / DEFAULT_REGISTRY_RELPATH def _citizen_from_raw(raw: Any, source: str) -> Citizen: if not isinstance(raw, dict): raise RegistryError(f"{source}: citizen entry must be a mapping, got {type(raw).__name__}") cid = raw.get("id") if not isinstance(cid, str) or not cid: raise RegistryError(f"{source}: citizen entry missing string 'id'") public_key = raw.get("public_key") or raw.get("key") if not isinstance(public_key, str) or len(public_key.strip()) != PUBLIC_KEY_HEX_LEN: raise RegistryError( f"{source}: citizen {cid!r} must have a {PUBLIC_KEY_HEX_LEN}-hex-char 'public_key'" ) joined_at = raw.get("joined_at") if not isinstance(joined_at, str): raise RegistryError(f"{source}: citizen {cid!r} missing 'joined_at' timestamp") parse_iso(joined_at) # validate status = raw.get("status", STATUS_ACTIVE) if status not in VALID_STATUSES: raise RegistryError( f"{source}: citizen {cid!r} has invalid status {status!r}; " f"allowed: {VALID_STATUSES}" ) name = raw.get("name", cid) if not isinstance(name, str): raise RegistryError(f"{source}: citizen {cid!r} has non-string 'name'") return Citizen( id=cid, name=name, public_key=public_key.strip().lower(), joined_at=joined_at, status=status, ) def load_registry(path: Union[str, Path]) -> Dict[str, Citizen]: """Load the citizen registry into a dict keyed by citizen id. Accepts either ``{"citizens": [...]}`` or a bare top-level list, so forks with a hand-written registry still parse. Duplicate ids and duplicate public keys are hard errors: one person, one vote means one key, one identity. """ path = Path(path) if not path.exists(): raise RegistryError(f"citizen registry not found at {path}") data = load_yaml(path) if isinstance(data, dict): raw_list = data.get("citizens") if raw_list is None: raise RegistryError(f"{path}: expected a 'citizens' list") elif isinstance(data, list): raw_list = data else: raise RegistryError(f"{path}: registry must be a mapping or a list") if not isinstance(raw_list, list): raise RegistryError(f"{path}: 'citizens' must be a list") registry: Dict[str, Citizen] = {} seen_keys: Dict[str, str] = {} for raw in raw_list: citizen = _citizen_from_raw(raw, str(path)) if citizen.id in registry: raise RegistryError(f"{path}: duplicate citizen id {citizen.id!r}") if citizen.public_key in seen_keys: raise RegistryError( f"{path}: citizens {seen_keys[citizen.public_key]!r} and {citizen.id!r} " f"share a public key — one key, one identity" ) registry[citizen.id] = citizen seen_keys[citizen.public_key] = citizen.id return registry def load_registry_from_root(repo_root: Union[str, Path]) -> Dict[str, Citizen]: return load_registry(registry_path(repo_root)) def eligible_voters(registry: Dict[str, Citizen], as_of: datetime) -> Dict[str, Citizen]: """Citizens eligible to vote on a proposal whose voting opened at *as_of*. Eligible means: status ``active`` and joined at or before *as_of*. """ return { cid: c for cid, c in registry.items() if c.status == STATUS_ACTIVE and c.joined_at_dt() <= as_of } def check_voter_eligibility( citizen_id: str, registry: Dict[str, Citizen], as_of: datetime, ) -> Citizen: """Return the citizen if eligible; raise :class:`EligibilityError` with a precise, auditable reason otherwise.""" citizen = registry.get(citizen_id) if citizen is None: raise EligibilityError(f"{citizen_id!r} is not in the citizen registry") if citizen.status != STATUS_ACTIVE: raise EligibilityError( f"{citizen_id!r} has status {citizen.status!r}; only active citizens may vote" ) if citizen.joined_at_dt() > as_of: raise EligibilityError( f"{citizen_id!r} joined at {citizen.joined_at}, after voting opened — " f"the electorate is frozen when a vote opens" ) return citizen