#!/usr/bin/env python3 """FablePool milestone-2 conformance runner. Checks an implementation against the wire-format test vectors: 1. vectors/canonicalization.json -- deterministic JSON encoding (byte-exact) 2. vectors/signing.json -- Ed25519 keys derived from fixed seeds 3. vectors/op_scenarios.json -- signed-operation integrity (accept/reject) Modes: python conformance/run_conformance.py Self-check the implementation in this repository. If conformance/recorded.json exists, also compare the computed public keys, signatures, canonical digests, and operation ids against the recorded reference outputs (true cross-implementation vectors). python conformance/run_conformance.py --record Write conformance/recorded.json from the current run. Maintainers run this once with the reference implementation and commit the result so that second implementations have fixed expected values to match. A JSON report is always written (default: conformance/results/report.json). Exit code 0 means every check passed. """ from __future__ import annotations import argparse import copy import json import sys import time from pathlib import Path HERE = Path(__file__).resolve().parent ROOT = HERE.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from fablepool import canonical # noqa: E402 from fablepool import keys # noqa: E402 from fablepool import ops # noqa: E402 FIXED_TS = 1700000000 def as_hex(value): """Normalise a value that may be bytes or a hex string to a hex string.""" if isinstance(value, (bytes, bytearray)): return bytes(value).hex() return str(value) def truthy(fn, *args): """Call ``fn``; a raised exception means False, a None return means True. This tolerates both verifier styles: returning a bool, or returning None on success and raising on failure. """ try: result = fn(*args) except Exception: return False return True if result is None else bool(result) def apply_mutation(op_dict, mutation): path = mutation["path"].split(".") target = op_dict for part in path[:-1]: target = target[part] if mutation["op"] == "set": target[path[-1]] = mutation["value"] elif mutation["op"] == "delete": target.pop(path[-1], None) else: raise ValueError(f"unknown mutation op: {mutation['op']!r}") class Report: def __init__(self): self.results = [] self.recorded = {"canonical_digests": {}, "signing": {}, "op_ids": {}} def add(self, section, name, ok, detail=""): self.results.append( {"section": section, "name": name, "ok": bool(ok), "detail": detail} ) @property def passed(self): return sum(1 for r in self.results if r["ok"]) @property def failed(self): return sum(1 for r in self.results if not r["ok"]) def run_canonicalization(report, vectors): for v in vectors["vectors"]: name = v["name"] try: out = canonical.canonical_encode(v["input"]) except Exception as exc: report.add("canonicalization", name, False, f"encoder raised: {exc!r}") continue if not isinstance(out, (bytes, bytearray)): report.add( "canonicalization", name, False, f"encoder returned {type(out).__name__}, expected bytes", ) continue out = bytes(out) expected = v["expected"].encode("utf-8") ok = out == expected detail = "" if ok else f"got {out!r}, want {expected!r}" report.add("canonicalization", name, ok, detail) report.recorded["canonical_digests"][name] = canonical.sha256_hex(out) def run_signing(report, vectors): for v in vectors["vectors"]: name = v["name"] seed = bytes.fromhex(v["seed"]) msg = v["message"].encode("utf-8") try: kp = keys.KeyPair.from_seed(seed) pub = as_hex(kp.public_key_hex) sig1 = as_hex(kp.sign(msg)) sig2 = as_hex(kp.sign(msg)) except Exception as exc: report.add("signing", name, False, f"key derivation/signing raised: {exc!r}") continue report.add( "signing", f"{name}/deterministic", sig1 == sig2, "" if sig1 == sig2 else "two signatures over the same message differ", ) report.add( "signing", f"{name}/verifies", truthy(keys.verify, pub, sig1, msg), "signature over the original message must verify", ) report.add( "signing", f"{name}/rejects-tampered-message", not truthy(keys.verify, pub, sig1, msg + b"x"), "signature must not verify over a tampered message", ) flipped = sig1[:-1] + ("0" if sig1[-1] != "0" else "1") report.add( "signing", f"{name}/rejects-tampered-signature", not truthy(keys.verify, pub, flipped, msg), "a tampered signature must not verify", ) report.recorded["signing"][name] = {"public_key": pub, "signature": sig1} def run_op_scenarios(report, vectors): kp = keys.KeyPair.from_seed(bytes.fromhex(vectors["seed"])) author = as_hex(kp.public_key_hex) for v in vectors["vectors"]: name = v["name"] body = { "v": 1, "type": v["type"], "author": author, "ts": FIXED_TS, "prev": list(v.get("prev", [])), "payload": copy.deepcopy(v["payload"]), } try: ret = ops.sign_op(body, kp) signed = ret if ret is not None else body except Exception as exc: report.add("op-scenarios", name, False, f"sign_op raised: {exc!r}") continue mutated = copy.deepcopy(signed) try: for m in v.get("mutations", []): apply_mutation(mutated, m) except Exception as exc: report.add("op-scenarios", name, False, f"mutation failed: {exc!r}") continue verified = truthy(ops.verify_op, mutated) want_accept = v["expect"] == "accept" report.add( "op-scenarios", name, verified == want_accept, f"verify_op -> {verified}, expected " f"{'accept' if want_accept else 'reject'}", ) if want_accept and not v.get("mutations") and "id" in signed: report.recorded["op_ids"][name] = str(signed["id"]) def flatten_recorded(recorded): flat = {} for name, digest in recorded.get("canonical_digests", {}).items(): flat[f"canonical/{name}/sha256"] = digest for name, entry in recorded.get("signing", {}).items(): flat[f"signing/{name}/public_key"] = entry["public_key"] flat[f"signing/{name}/signature"] = entry["signature"] for name, op_id in recorded.get("op_ids", {}).items(): flat[f"op_id/{name}"] = op_id return flat def compare_recorded(report, baseline): current = flatten_recorded(report.recorded) expected = flatten_recorded(baseline) for key, want in sorted(expected.items()): got = current.get(key) ok = got == want detail = "" if ok else f"got {got!r}, recorded reference is {want!r}" report.add("recorded-comparison", key, ok, detail) def main(argv=None): parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) parser.add_argument( "--vectors-dir", default=str(HERE / "vectors"), help="directory containing the vector JSON files", ) parser.add_argument( "--results-dir", default=str(HERE / "results"), help="directory to write report.json into", ) parser.add_argument( "--recorded", default=str(HERE / "recorded.json"), help="path of the recorded reference outputs", ) parser.add_argument( "--record", action="store_true", help="write recorded.json from this run instead of comparing against it", ) parser.add_argument("--quiet", action="store_true", help="only print failures") args = parser.parse_args(argv) vectors_dir = Path(args.vectors_dir) report = Report() with open(vectors_dir / "canonicalization.json", encoding="utf-8") as fh: run_canonicalization(report, json.load(fh)) with open(vectors_dir / "signing.json", encoding="utf-8") as fh: run_signing(report, json.load(fh)) with open(vectors_dir / "op_scenarios.json", encoding="utf-8") as fh: run_op_scenarios(report, json.load(fh)) recorded_path = Path(args.recorded) compared = False if args.record: recorded_path.parent.mkdir(parents=True, exist_ok=True) recorded_path.write_text( json.dumps(report.recorded, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) print(f"recorded reference outputs -> {recorded_path}") elif recorded_path.exists(): with open(recorded_path, encoding="utf-8") as fh: compare_recorded(report, json.load(fh)) compared = True results_dir = Path(args.results_dir) results_dir.mkdir(parents=True, exist_ok=True) report_path = results_dir / "report.json" report_path.write_text( json.dumps( { "generated_at": int(time.time()), "passed": report.passed, "failed": report.failed, "compared_against_recorded": compared, "results": report.results, "recorded": report.recorded, }, indent=2, sort_keys=True, ) + "\n", encoding="utf-8", ) for r in report.results: if not r["ok"]: print(f"FAIL [{r['section']}] {r['name']}: {r['detail']}") elif not args.quiet: print(f"ok [{r['section']}] {r['name']}") print( f"\nconformance: {report.passed} passed, {report.failed} failed " f"(report: {report_path})" ) return 0 if report.failed == 0 else 1 if __name__ == "__main__": sys.exit(main())