"""Adapter layer between the test suite and the govtool implementation. Why this exists --------------- The tests assert behaviour: "a signed ballot verifies", "a tampered ledger fails verification", "a kernel change is classified major", "a proposal without quorum is blocked". They deliberately do NOT hard-code exact keyword spellings of every internal function. Instead every cross-module call is routed through :func:`call_adaptive`, which binds test values to the callee's *actual* parameter names via the alias tables below. If an implementation parameter is required and unmapped, the test fails loudly with a message pointing here, so reconciling a rename is a one-line fix in one file. This file is the single integration point between the suite and the implementation. Keep it boring. """ from __future__ import annotations import dataclasses import importlib import inspect import json import os import re import shutil import subprocess import sys from pathlib import Path import yaml PROJECT_ROOT = Path(__file__).resolve().parents[1] SRC_DIR = PROJECT_ROOT / "src" SEMVER_RE = re.compile(r"^\d+\.\d+\.\d+(?:[-+][0-9A-Za-z.\-]+)?$") HEX64_RE = re.compile(r"\b[0-9a-f]{64}\b") _MISSING = object() # --------------------------------------------------------------------------- # Alias tables: semantic value -> the parameter names it may bind to. # --------------------------------------------------------------------------- A_REPO = { "repo", "repo_path", "repo_dir", "root", "root_dir", "base_path", "workdir", "polity", "polity_path", "path", "directory", } A_PROPOSAL_ID = {"proposal_id", "prop_id", "pid", "proposal", "proposal_ref"} A_CITIZEN = {"citizen_id", "citizen", "voter", "voter_id", "member", "member_id"} A_CHOICE = {"choice", "vote", "value", "decision", "position"} A_PRIVKEY = { "private_key", "private_key_hex", "priv", "privkey", "signing_key", "secret_key", "secret", "sk", } A_PUBKEY = {"public_key", "public_key_hex", "pub", "pubkey", "verify_key", "vk"} A_MESSAGE = {"message", "msg", "data", "payload", "content"} A_SIGNATURE = {"signature", "sig", "signature_hex"} A_TITLE = {"title", "name", "subject"} A_RATIONALE = {"rationale", "motivation", "summary", "description", "body", "reason"} A_CHANGES = {"changes", "files", "changed_files", "file_changes", "patch", "diff", "contents"} A_PROPOSER = {"proposer", "proposer_id", "author", "author_id", "sponsor"} A_BALLOTS = {"ballots", "votes", "cast_ballots"} A_ELIGIBLE = { "eligible", "eligible_ids", "eligible_voters", "voters", "electorate", "eligible_citizens", "citizens", } A_ELIGIBLE_COUNT = {"eligible_count", "n_eligible", "total_eligible", "electorate_size"} A_QUORUM = {"quorum", "quorum_fraction", "quorum_ratio", "min_turnout"} A_THRESHOLD = {"threshold", "approval_threshold", "threshold_fraction", "majority", "required"} A_EVENT_TYPE = {"event_type", "type", "kind", "event", "event_name"} A_PAYLOAD = {"payload", "data", "body", "details"} A_ACTOR = {"actor", "author", "agent", "by", "signer", "source"} A_BASE = {"base", "base_dir", "old", "before", "old_dir", "previous"} A_HEAD = {"head", "head_dir", "new", "after", "proposed", "new_dir"} A_PATHS = {"changed_paths", "paths", "files_changed", "file_list"} A_UPSTREAM = {"upstream", "upstream_path", "src", "source", "origin", "origin_path", "from_path"} A_DEST = {"dest", "destination", "target", "out", "output", "dst", "fork_path", "dest_path", "to_path"} A_FORK_NAME = {"name", "fork_name", "group", "group_name", "label"} A_PARAMS = {"params", "parameters", "overrides", "config", "configuration"} A_REGISTRY = {"registry", "reg"} A_REGISTRY_PATH = {"registry_path", "citizens_path", "registry_file"} A_BALLOT = {"ballot", "vote_record", "cast"} # --------------------------------------------------------------------------- # Core adaptive machinery # --------------------------------------------------------------------------- def resolve_fn(module, *names): """Return the first callable attribute of *module* among *names*.""" for name in names: fn = getattr(module, name, None) if callable(fn): return fn raise AssertionError( f"module {module.__name__!r} exposes none of {names!r}; " f"add the real name to the alias call site in tests/helpers.py" ) def call_adaptive(fn, spec): """Call *fn*, binding values to parameters by name. *spec* is a list of ``(alias_set, value)`` pairs. Every parameter of *fn* whose name appears in an alias set receives that value. Optional parameters with no mapping are left to their defaults. A *required* parameter with no mapping is a loud failure pointing the maintainer at this file. """ sig = inspect.signature(fn) args, kwargs = [], {} for pname, param in sig.parameters.items(): if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): continue matched = False value = None for aliases, val in spec: if pname in aliases: value, matched = val, True break if not matched: if param.default is inspect.Parameter.empty: raise AssertionError( f"{getattr(fn, '__qualname__', fn)!r} requires parameter " f"{pname!r}, which has no alias mapping in tests/helpers.py. " f"Add {pname!r} to the appropriate alias table." ) continue if param.kind is inspect.Parameter.POSITIONAL_ONLY: args.append(value) else: kwargs[pname] = value return fn(*args, **kwargs) def verification_ok(thunk): """Normalise 'verification' conventions: raising == False, else truthy/None == OK.""" try: result = thunk() except Exception: return False return result is not False # --------------------------------------------------------------------------- # Generic field access on dicts / dataclasses / plain objects # --------------------------------------------------------------------------- def get_field(obj, *names, default=_MISSING): for name in names: if isinstance(obj, dict): if name in obj: return obj[name] elif hasattr(obj, name): return getattr(obj, name) if default is not _MISSING: return default raise AssertionError( f"object of type {type(obj).__name__!r} has none of the fields {names!r}; " f"add the real field name in tests/helpers.py" ) def set_field(obj, name, value): """Return an object equal to *obj* with field *name* set to *value*.""" if isinstance(obj, dict): out = dict(obj) out[name] = value return out if dataclasses.is_dataclass(obj) and not isinstance(obj, type): try: return dataclasses.replace(obj, **{name: value}) except TypeError: pass try: setattr(obj, name, value) return obj except Exception as exc: # frozen object without dataclasses.replace support raise AssertionError(f"cannot mutate field {name!r} on {type(obj).__name__}: {exc}") def entry_dict(entry): if isinstance(entry, dict): return entry if dataclasses.is_dataclass(entry) and not isinstance(entry, type): return dataclasses.asdict(entry) if hasattr(entry, "__dict__"): return dict(vars(entry)) return {"repr": repr(entry)} def citizen_id(citizen): if isinstance(citizen, str): return citizen return get_field(citizen, "id", "citizen_id", "cid") # --------------------------------------------------------------------------- # Keys # --------------------------------------------------------------------------- def normalize_keypair(kp): """Return ``(private, public)`` strings from whatever generate_keypair returns.""" if isinstance(kp, (tuple, list)) and len(kp) == 2: return str(kp[0]), str(kp[1]) if isinstance(kp, dict): for priv_name in ("private_key", "private_key_hex", "signing_key", "secret_key", "private"): for pub_name in ("public_key", "public_key_hex", "verify_key", "public"): if priv_name in kp and pub_name in kp: return str(kp[priv_name]), str(kp[pub_name]) for priv_name in ("private_key", "private_key_hex", "signing_key", "secret_key", "private"): for pub_name in ("public_key", "public_key_hex", "verify_key", "public"): if hasattr(kp, priv_name) and hasattr(kp, pub_name): return str(getattr(kp, priv_name)), str(getattr(kp, pub_name)) raise AssertionError( f"cannot extract (private, public) from keypair object {type(kp).__name__!r}; " f"extend normalize_keypair in tests/helpers.py" ) def generate_keypair(): keys = importlib.import_module("govtool.keys") fn = resolve_fn(keys, "generate_keypair", "generate", "new_keypair", "keygen") return normalize_keypair(fn()) def sign_message(private_key, message): keys = importlib.import_module("govtool.keys") fn = resolve_fn(keys, "sign", "sign_message", "sign_bytes") return call_adaptive(fn, [(A_PRIVKEY, private_key), (A_MESSAGE, message)]) def signature_ok(public_key, message, signature): keys = importlib.import_module("govtool.keys") fn = resolve_fn(keys, "verify", "verify_signature", "check_signature") return verification_ok( lambda: call_adaptive( fn, [(A_PUBKEY, public_key), (A_MESSAGE, message), (A_SIGNATURE, signature)], ) ) # --------------------------------------------------------------------------- # Registry # --------------------------------------------------------------------------- def write_registry(path: Path, citizens): doc = { "citizens": [ { "id": c["id"], "name": c["name"], "public_key": c["public_key"], "status": c["status"], "joined": c.get("joined", "2025-01-01"), } for c in citizens ] } path.parent.mkdir(parents=True, exist_ok=True) path.write_text(yaml.safe_dump(doc, sort_keys=False), encoding="utf-8") return path def load_registry(path: Path): eligibility = importlib.import_module("govtool.eligibility") fn = resolve_fn(eligibility, "load_registry", "read_registry", "load_citizens", "load") return call_adaptive(fn, [(A_REGISTRY_PATH | {"path"}, Path(path))]) def eligible_voters(registry_obj, registry_path: Path): eligibility = importlib.import_module("govtool.eligibility") fn = resolve_fn( eligibility, "eligible_voters", "get_eligible", "eligible", "active_citizens" ) return call_adaptive( fn, [ (A_REGISTRY, registry_obj), (A_REGISTRY_PATH | {"path"}, Path(registry_path)), ], ) # --------------------------------------------------------------------------- # Ledger # --------------------------------------------------------------------------- def open_ledger(path: Path): mod = importlib.import_module("govtool.ledger") cls = None for name in ("Ledger", "AuditLedger", "AuditLog", "LedgerFile"): cls = getattr(mod, name, None) if cls is not None: break assert cls is not None, "no Ledger class found in govtool.ledger" try: return cls(Path(path)) except TypeError: pass for factory in ("open", "load", "at", "create"): f = getattr(cls, factory, None) if callable(f): return f(Path(path)) raise AssertionError("could not construct a Ledger; extend open_ledger in tests/helpers.py") def ledger_append(lgr, event_type, payload): fn = None for name in ("append", "add", "record", "write", "log"): candidate = getattr(lgr, name, None) if callable(candidate): fn = candidate break assert fn is not None, "no append-like method on the ledger object" return call_adaptive( fn, [(A_EVENT_TYPE, event_type), (A_PAYLOAD, payload), (A_ACTOR, "test-suite")], ) def ledger_entries(lgr): for name in ("entries", "all", "read", "items", "events"): attr = getattr(lgr, name, None) if attr is None: continue return list(attr()) if callable(attr) else list(attr) raise AssertionError("no entries-like accessor on the ledger object") def ledger_verify(lgr): """Return True/False if a verify-like method exists, else None.""" for name in ("verify", "verify_chain", "validate", "check", "audit"): method = getattr(lgr, name, None) if callable(method): return verification_ok(method) return None # --------------------------------------------------------------------------- # Ballots and tally # --------------------------------------------------------------------------- def cast_ballot(citizen, proposal_id, choice, repo=None): mod = importlib.import_module("govtool.ballots") fn = resolve_fn(mod, "cast_ballot", "make_ballot", "create_ballot", "sign_ballot", "cast") spec = [ (A_PROPOSAL_ID, proposal_id), (A_CITIZEN, citizen["id"]), (A_CHOICE, choice), (A_PRIVKEY, citizen["private_key"]), (A_PUBKEY, citizen["public_key"]), ] if repo is not None: spec.insert(0, (A_REPO, Path(repo))) return call_adaptive(fn, spec) def ballot_ok(ballot, public_key, proposal_id=None): mod = importlib.import_module("govtool.ballots") fn = resolve_fn(mod, "verify_ballot", "check_ballot", "ballot_valid", "verify") return verification_ok( lambda: call_adaptive( fn, [(A_BALLOT, ballot), (A_PUBKEY, public_key), (A_PROPOSAL_ID, proposal_id)], ) ) def run_tally(ballots, eligible_ids, quorum, threshold): mod = importlib.import_module("govtool.tally") fn = resolve_fn(mod, "tally_ballots", "tally", "count_ballots", "run_tally", "compute") return call_adaptive( fn, [ (A_BALLOTS, list(ballots)), (A_ELIGIBLE, list(eligible_ids)), (A_ELIGIBLE_COUNT, len(list(eligible_ids))), (A_QUORUM, quorum), (A_THRESHOLD, threshold), ], ) def yes_count(result): return int(get_field(result, "yes", "yes_count", "ayes", "approve", "for_count")) def no_count(result): return int(get_field(result, "no", "no_count", "nays", "against", "against_count")) def quorum_met(result): return bool(get_field(result, "quorum_met", "has_quorum", "quorum_ok", "quorum_reached")) def passed_of(result): if isinstance(result, bool): return result return bool(get_field(result, "passed", "ok", "success", "approved", "accepted", "ratifiable")) # --------------------------------------------------------------------------- # Classifier # --------------------------------------------------------------------------- def changed_paths(base: Path, head: Path): base, head = Path(base), Path(head) base_files = {p.relative_to(base) for p in base.rglob("*") if p.is_file()} head_files = {p.relative_to(head) for p in head.rglob("*") if p.is_file()} out = set() for rel in base_files | head_files: b, h = base / rel, head / rel if not (b.exists() and h.exists()) or b.read_bytes() != h.read_bytes(): out.add(str(rel).replace(os.sep, "/")) return sorted(out) def classify(base, head): mod = importlib.import_module("govtool.classifier") fn = resolve_fn(mod, "classify_dirs", "classify_change", "classify", "classify_trees", "label") return call_adaptive( fn, [ (A_BASE, Path(base)), (A_HEAD, Path(head)), (A_PATHS, changed_paths(base, head)), ], ) def impact_of(result): if isinstance(result, str): value = result else: value = get_field(result, "impact", "level", "label", "bump", "classification", "semver") text = str(getattr(value, "value", value)).lower() return text.rsplit(".", 1)[-1] def tweak_first_numeric(node): """Return ``(new_node, changed)`` with the first non-bool numeric leaf bumped by 1.""" if isinstance(node, bool): return node, False if isinstance(node, (int, float)): return node + 1, True if isinstance(node, list): out, changed = [], False for item in node: if not changed: item, changed = tweak_first_numeric(item) out.append(item) return out, changed if isinstance(node, dict): out, changed = {}, False for key, val in node.items(): if not changed: val, changed = tweak_first_numeric(val) out[key] = val return out, changed return node, False # --------------------------------------------------------------------------- # Proposals and the gate # --------------------------------------------------------------------------- def create_proposal(repo, proposer_id, title, changes, rationale="test amendment"): mod = importlib.import_module("govtool.proposal") fn = resolve_fn( mod, "create_proposal", "create", "new_proposal", "propose", "open_proposal" ) return call_adaptive( fn, [ (A_REPO, Path(repo)), (A_PROPOSER, proposer_id), (A_TITLE, title), (A_RATIONALE, rationale), (A_CHANGES, changes), ], ) def proposal_id_of(prop): if isinstance(prop, str): return prop return str(get_field(prop, "id", "proposal_id", "pid")) def run_gate(repo, proposal_id, ballots=None): mod = importlib.import_module("govtool.gate") fn = resolve_fn(mod, "run_gate", "evaluate_proposal", "evaluate", "gate", "check_proposal", "run") spec = [ (A_REPO, Path(repo)), (A_PROPOSAL_ID, proposal_id), (A_REGISTRY_PATH, Path(repo) / "citizens" / "registry.yaml"), ] if ballots is not None: spec.append((A_BALLOTS, list(ballots))) return call_adaptive(fn, spec) def gate_passed(result): return passed_of(result) # --------------------------------------------------------------------------- # Fork tooling # --------------------------------------------------------------------------- def create_fork(upstream, dest, name, params): mod = importlib.import_module("govtool.fork") fn = resolve_fn(mod, "create_fork", "fork", "init_fork", "fork_repo", "clone_fork") return call_adaptive( fn, [ (A_UPSTREAM, Path(upstream)), (A_DEST, Path(dest)), (A_FORK_NAME, name), (A_PARAMS, params), ], ) def upstream_status_fn(): mod = importlib.import_module("govtool.fork") for name in ("upstream_status", "check_upstream", "status", "upstream_diff", "compare_upstream", "drift"): fn = getattr(mod, name, None) if callable(fn): return fn return None def upstream_status(fork_path, upstream): fn = upstream_status_fn() assert fn is not None return call_adaptive( fn, [ (A_REPO | {"fork", "fork_path"}, Path(fork_path)), (A_UPSTREAM, Path(upstream)), ], ) # --------------------------------------------------------------------------- # Subprocess helpers (CLI and demo) # --------------------------------------------------------------------------- def run_cli(*args, cwd=None): env = dict(os.environ) env["PYTHONPATH"] = str(SRC_DIR) + os.pathsep + env.get("PYTHONPATH", "") return subprocess.run( [sys.executable, "-m", "govtool", *[str(a) for a in args]], capture_output=True, text=True, cwd=str(cwd or PROJECT_ROOT), env=env, timeout=120, ) def run_script(script_path, cwd=None): env = dict(os.environ) env["PYTHONPATH"] = str(SRC_DIR) + os.pathsep + env.get("PYTHONPATH", "") return subprocess.run( [sys.executable, str(script_path)], capture_output=True, text=True, cwd=str(cwd or PROJECT_ROOT), env=env, timeout=300, ) # --------------------------------------------------------------------------- # YAML scanning utilities for constitution structure tests # --------------------------------------------------------------------------- def walk_strings(node): if isinstance(node, str): yield node elif isinstance(node, dict): for key, val in node.items(): yield from walk_strings(key) yield from walk_strings(val) elif isinstance(node, (list, tuple)): for item in node: yield from walk_strings(item) def find_semver(node): return [s for s in walk_strings(node) if SEMVER_RE.match(s)] def tree_text(root: Path): chunks = [] for path in sorted(Path(root).rglob("*")): if path.is_file(): try: chunks.append(path.read_text(encoding="utf-8")) except UnicodeDecodeError: continue return "\n".join(chunks) def json_text(obj): return json.dumps(entry_dict(obj), default=str, sort_keys=True)