"""Append-only, hash-chained audit ledger. Every governance event — citizens joining, proposals opening, each individual ballot, the tally, ratification or rejection, version releases, fork declarations — lands as one line of JSON in ``ledger/ledger.jsonl``. Each entry commits to its predecessor by hash, so rewriting history breaks the chain and is detectable by anyone with a copy of the file. This is Article 8 (audit ledger) as code. Entry format:: { "seq": 1, # 1-based, strictly increasing by 1 "timestamp": "...Z", # ISO-8601 UTC "kind": "proposal_created", # one of KINDS "payload": { ... }, # event-specific, canonical-JSON-able "prev_hash": "<64 hex>", # entry_hash of the previous entry, # ZERO_HASH for the genesis entry "entry_hash": "<64 hex>" # hash_obj of the entry minus this field } """ from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional, Union from .canonical import ZERO_HASH, hash_obj, is_hex_digest, parse_iso, utcnow_iso from .errors import LedgerError, LedgerIntegrityError DEFAULT_LEDGER_RELPATH = Path("ledger") / "ledger.jsonl" KINDS = frozenset({ "genesis", "citizen_added", "citizen_status_changed", "proposal_created", "proposal_opened", "ballot_cast", "proposal_closed", "tally_recorded", "proposal_ratified", "proposal_rejected", "proposal_withdrawn", "version_released", "fork_declared", "fork_upstream_pull", }) _ENTRY_FIELDS = ("seq", "timestamp", "kind", "payload", "prev_hash", "entry_hash") def ledger_path(repo_root: Union[str, Path]) -> Path: """The conventional ledger location inside a governance repository.""" return Path(repo_root) / DEFAULT_LEDGER_RELPATH def compute_entry_hash(entry: Dict[str, Any]) -> str: """Hash of an entry's content (everything except ``entry_hash``).""" body = {k: entry[k] for k in ("seq", "timestamp", "kind", "payload", "prev_hash")} return hash_obj(body) def read_entries(path: Union[str, Path]) -> List[Dict[str, Any]]: """Read all ledger entries. A missing file is an empty ledger.""" path = Path(path) if not path.exists(): return [] entries: List[Dict[str, Any]] = [] with path.open("r", encoding="utf-8") as fh: for lineno, line in enumerate(fh, start=1): line = line.strip() if not line: continue try: entry = json.loads(line) except json.JSONDecodeError as exc: raise LedgerError(f"{path}:{lineno}: invalid JSON: {exc}") from exc if not isinstance(entry, dict): raise LedgerError(f"{path}:{lineno}: entry is not an object") entries.append(entry) return entries def last_entry(path: Union[str, Path]) -> Optional[Dict[str, Any]]: """The most recent entry, or None for an empty ledger.""" entries = read_entries(path) return entries[-1] if entries else None def append_entry( path: Union[str, Path], kind: str, payload: Dict[str, Any], timestamp: Optional[str] = None, ) -> Dict[str, Any]: """Append a new entry to the ledger and return it. The chain head is re-read at append time so concurrent local edits cannot silently fork the chain; CI re-verifies the whole chain on every push anyway. """ if kind not in KINDS: raise LedgerError(f"unknown ledger entry kind {kind!r}; allowed: {sorted(KINDS)}") if not isinstance(payload, dict): raise LedgerError("ledger payload must be a JSON object") path = Path(path) prev = last_entry(path) if prev is None: seq = 1 prev_hash = ZERO_HASH else: seq = int(prev["seq"]) + 1 prev_hash = prev["entry_hash"] ts = timestamp if timestamp is not None else utcnow_iso() parse_iso(ts) # validate entry: Dict[str, Any] = { "seq": seq, "timestamp": ts, "kind": kind, "payload": payload, "prev_hash": prev_hash, } entry["entry_hash"] = compute_entry_hash(entry) path.parent.mkdir(parents=True, exist_ok=True) line = json.dumps(entry, sort_keys=True, separators=(",", ":"), ensure_ascii=False) with path.open("a", encoding="utf-8") as fh: fh.write(line + "\n") return entry def verify_chain(path: Union[str, Path]) -> int: """Verify the entire hash chain. Returns the number of entries. Checks per entry: required fields present, kind known, timestamps parseable and non-decreasing, ``seq`` strictly increments from 1, ``prev_hash`` matches the predecessor's ``entry_hash``, and ``entry_hash`` matches the recomputed content hash. Raises :class:`LedgerIntegrityError` with the offending sequence number on any failure. """ entries = read_entries(path) prev_hash = ZERO_HASH prev_ts = None for index, entry in enumerate(entries): where = f"ledger entry index {index}" for field in _ENTRY_FIELDS: if field not in entry: raise LedgerIntegrityError(f"{where}: missing field {field!r}") seq = entry["seq"] if seq != index + 1: raise LedgerIntegrityError(f"{where}: seq is {seq}, expected {index + 1}") if entry["kind"] not in KINDS: raise LedgerIntegrityError(f"seq {seq}: unknown kind {entry['kind']!r}") ts = parse_iso(entry["timestamp"]) if prev_ts is not None and ts < prev_ts: raise LedgerIntegrityError( f"seq {seq}: timestamp {entry['timestamp']} precedes previous entry" ) if entry["prev_hash"] != prev_hash: raise LedgerIntegrityError( f"seq {seq}: prev_hash {entry['prev_hash'][:12]}… does not match " f"chain head {prev_hash[:12]}…" ) if not is_hex_digest(entry["entry_hash"]): raise LedgerIntegrityError(f"seq {seq}: entry_hash is not a sha256 hex digest") recomputed = compute_entry_hash(entry) if recomputed != entry["entry_hash"]: raise LedgerIntegrityError( f"seq {seq}: entry_hash mismatch (recorded {entry['entry_hash'][:12]}…, " f"recomputed {recomputed[:12]}…) — content was altered after the fact" ) prev_hash = entry["entry_hash"] prev_ts = ts return len(entries) def entries_for_proposal(path: Union[str, Path], proposal_id: str) -> List[Dict[str, Any]]: """All ledger entries whose payload references *proposal_id*.""" return [ e for e in read_entries(path) if isinstance(e.get("payload"), dict) and e["payload"].get("proposal_id") == proposal_id ]