"""Deterministic conformance test-vector suite. ``build_suite()`` produces the same vectors on every run: keys are derived from fixed seeds and timestamps are constants. Each vector is a JSON document: { "format": "fpcf-vectors-1", "name": "valid/claim-assert", # also the file path stem "kind": "valid" | "invalid", "context_b64": ["", ...], # preload, in order "raw_b64": "", # the op under test "expected": {"op_id": "fp:op:..."} # valid vectors | {"error": "FP-E-..."} # invalid vectors } A conforming implementation must: load each context op into a fresh log (all context ops are valid), then attempt to append the op under test, and observe exactly the expected outcome. """ import base64 import hashlib import json from collections import namedtuple from pathlib import Path from typing import List, Optional from .canonical import canonicalize from .errors import ( E_JSON, E_NUMBER, E_CANONICAL, E_ENVELOPE, E_BODY, E_SIG, E_PREV, E_CAUSALITY, E_REF, E_DUP, ) from .ids import op_id_for, content_hash from .signing import keypair_from_seed, make_envelope, sign_envelope, public_str VECTORS_FORMAT = "fpcf-vectors-1" ALICE_SEED = hashlib.sha256(b"fablepool:fpcf:test-seed:alice").digest() BOB_SEED = hashlib.sha256(b"fablepool:fpcf:test-seed:bob").digest() _ICS = ( b"BEGIN:VCALENDAR\r\nBEGIN:VEVENT\r\nSUMMARY:Yoga class\r\n" b"RRULE:FREQ=WEEKLY;BYDAY=TU\r\nDTSTART:20250304T180000\r\n" b"END:VEVENT\r\nEND:VCALENDAR\r\n" ) _NOTE = b"Tuesday yoga with Sam, moved to 18:00 starting March.\n" _PHOTO_META = b'{"camera":"demo","gps":null,"taken":"2025-03-04T18:02:11Z"}\n' _ANSWER = b"You appear to attend a weekly yoga class on Tuesdays at 18:00.\n" _FAKE_OP_ID = "fp:op:" + "00" * 32 _Op = namedtuple("_Op", ["env", "raw", "oid"]) def _ts(minute: int) -> str: return "2025-03-01T08:%02d:00Z" % minute def _b64(b: bytes) -> str: return base64.b64encode(b).decode("ascii") def _signed(sk, op_type: str, body: dict, ts: str, prev) -> _Op: env = make_envelope(op_type, body, sk, ts=ts, prev=prev) raw = canonicalize(env) return _Op(env, raw, op_id_for(env)) def _vector(name: str, kind: str, raw: bytes, expected: dict, context: List[_Op]) -> dict: return { "format": VECTORS_FORMAT, "name": name, "kind": kind, "context_b64": [_b64(op.raw) for op in context], "raw_b64": _b64(raw), "expected": expected, } def _valid(name: str, op: _Op, context: List[_Op]) -> dict: return _vector(name, "valid", op.raw, {"op_id": op.oid}, context) def _invalid(name: str, raw: bytes, code: str, context: List[_Op]) -> dict: return _vector(name, "invalid", raw, {"error": code}, context) def _replace_once(raw: bytes, old: bytes, new: bytes) -> bytes: if raw.count(old) != 1: raise AssertionError( "vector construction: expected exactly one occurrence of %r" % old ) return raw.replace(old, new) def build_suite() -> List[dict]: """Build the full deterministic vector suite.""" alice = keypair_from_seed(ALICE_SEED) bob = keypair_from_seed(BOB_SEED) bob_pub = public_str(bob) # ------------------------------------------------------------------ # The valid chain (all authored by alice; prev forms a simple chain). # ------------------------------------------------------------------ e1_body = { "source": {"adapter": "calendar.ics", "locator": "file:samples/demo.ics"}, "media_type": "text/calendar", "content_hash": content_hash(_ICS), "size": len(_ICS), "captured_at": _ts(0), } e1 = _signed(alice, "evidence-ingest", e1_body, _ts(0), []) e2_body = { "source": {"adapter": "notes.markdown", "locator": "file:samples/notes/yoga.md"}, "media_type": "text/markdown", "content_hash": content_hash(_NOTE), "size": len(_NOTE), "captured_at": _ts(1), } e2 = _signed(alice, "evidence-ingest", e2_body, _ts(1), [e1.oid]) e3_body = { "source": {"adapter": "photos.exif", "locator": "file:samples/photos/0001.json"}, "media_type": "application/json", "content_hash": content_hash(_PHOTO_META), "size": len(_PHOTO_META), "captured_at": "2025-03-01T08:01:30.500Z", "attrs": {"mock": True}, } e3 = _signed(alice, "evidence-ingest", e3_body, "2025-03-01T08:01:30.500Z", [e2.oid]) c1_body = { "subject": "fp:self", "predicate": "schedule.weekly_event", "object": { "value": {"summary": "Yoga class", "weekday": "tuesday", "time": "18:00"} }, "derived_from": [e1.oid, e2.oid], "confidence": 8200, "method": "rule:calendar.recurring.v1", } c1 = _signed(alice, "claim-assert", c1_body, _ts(2), [e2.oid]) corr_body = { "target": c1.oid, "reason": "user confirmed the event during review", "new_confidence": 9500, } corr = _signed(alice, "correction", corr_body, _ts(3), [c1.oid]) ref_body = { "target": c1.oid, "reason": "user states the class ended in February", "evidence": [e2.oid], } ref = _signed(alice, "refutation", ref_body, _ts(4), [corr.oid]) grant_body = { "grantee": bob_pub, "scope": { "predicates": ["schedule.*"], "subjects": ["fp:self"], "min_confidence": 5000, "include_provenance": False, }, "expires_at": "2026-03-01T00:00:00Z", } grant = _signed(alice, "permission-grant", grant_body, _ts(5), [ref.oid]) inf_body = { "caller": "app:assistant.demo", "query": "what do you know about my schedule and why?", "inputs": [c1.oid], "grant": grant.oid, "output_hash": content_hash(_ANSWER), "model": "rules-only", } inf = _signed(alice, "inference-call", inf_body, _ts(6), [grant.oid]) rev_body = {"target": grant.oid, "reason": "delegation demo finished"} rev = _signed(alice, "revocation", rev_body, _ts(7), [inf.oid]) chain = [e1, e2, e3, c1, corr, ref, grant, inf, rev] def ctx(op: _Op) -> List[_Op]: return chain[: chain.index(op)] vectors: List[dict] = [] # ------------------------------------------------------------------ # Valid vectors: one per op type, plus extras. # ------------------------------------------------------------------ vectors.append(_valid("valid/evidence-calendar", e1, ctx(e1))) vectors.append(_valid("valid/evidence-note", e2, ctx(e2))) vectors.append(_valid("valid/evidence-fractional-ts", e3, ctx(e3))) vectors.append(_valid("valid/claim-assert", c1, ctx(c1))) vectors.append(_valid("valid/correction", corr, ctx(corr))) vectors.append(_valid("valid/refutation", ref, ctx(ref))) vectors.append(_valid("valid/permission-grant", grant, ctx(grant))) vectors.append(_valid("valid/inference-call", inf, ctx(inf))) vectors.append(_valid("valid/revocation", rev, ctx(rev))) # ------------------------------------------------------------------ # Invalid: byte level. # ------------------------------------------------------------------ vectors.append(_invalid("invalid/json-truncated", b'{"v":1,', E_JSON, [])) vectors.append(_invalid("invalid/json-bad-utf8", b"\xff\xfe{}", E_JSON, [])) vectors.append( _invalid( "invalid/number-float", _replace_once(c1.raw, b'"confidence":8200', b'"confidence":0.82'), E_NUMBER, ctx(c1), ) ) vectors.append( _invalid( "invalid/number-too-large", _replace_once( e1.raw, b'"size":%d' % len(_ICS), b'"size":9007199254740993', # 2^53 + 1 ), E_NUMBER, [], ) ) # ------------------------------------------------------------------ # Invalid: canonical form. # ------------------------------------------------------------------ reversed_env = dict( sorted(e1.env.items(), key=lambda kv: kv[0].encode("utf-16-be"), reverse=True) ) non_canonical = json.dumps( reversed_env, separators=(",", ":"), ensure_ascii=False, sort_keys=False ).encode("utf-8") vectors.append( _invalid("invalid/canonical-key-order", non_canonical, E_CANONICAL, []) ) # ------------------------------------------------------------------ # Invalid: envelope schema. # ------------------------------------------------------------------ no_ts = { "v": 1, "type": "evidence-ingest", "author": public_str(alice), "prev": [], "body": e1_body, } vectors.append( _invalid( "invalid/envelope-missing-ts", canonicalize(sign_envelope(no_ts, alice)), E_ENVELOPE, [], ) ) vectors.append( _invalid( "invalid/envelope-unknown-type", _signed(alice, "gossip", {}, _ts(0), []).raw, E_ENVELOPE, [], ) ) extra = dict(e1.env) extra.pop("sig") extra["note"] = "hello" vectors.append( _invalid( "invalid/envelope-extra-field", canonicalize(sign_envelope(extra, alice)), E_ENVELOPE, [], ) ) # ------------------------------------------------------------------ # Invalid: body schema. # ------------------------------------------------------------------ body_no_conf = {k: v for k, v in c1_body.items() if k != "confidence"} vectors.append( _invalid( "invalid/body-missing-confidence", _signed(alice, "claim-assert", body_no_conf, _ts(2), [e2.oid]).raw, E_BODY, ctx(c1), ) ) body_bad_conf = dict(c1_body, confidence=20000) vectors.append( _invalid( "invalid/body-confidence-range", _signed(alice, "claim-assert", body_bad_conf, _ts(2), [e2.oid]).raw, E_BODY, ctx(c1), ) ) body_bad_hash = dict(e1_body, content_hash="md5:0a1b2c") vectors.append( _invalid( "invalid/body-bad-content-hash", _signed(alice, "evidence-ingest", body_bad_hash, _ts(0), []).raw, E_BODY, [], ) ) # ------------------------------------------------------------------ # Invalid: signature. # ------------------------------------------------------------------ bad_sig_env = dict(e1.env) sig = bad_sig_env["sig"] flipped = ("1" if sig[-1] != "1" else "2") bad_sig_env["sig"] = sig[:-1] + flipped vectors.append( _invalid("invalid/sig-flipped", canonicalize(bad_sig_env), E_SIG, []) ) # ------------------------------------------------------------------ # Invalid: log context. # ------------------------------------------------------------------ vectors.append( _invalid( "invalid/prev-missing", _signed(alice, "evidence-ingest", e1_body, _ts(0), [_FAKE_OP_ID]).raw, E_PREV, [], ) ) vectors.append( _invalid( "invalid/causality-ts-before-prev", _signed( alice, "evidence-ingest", e2_body, "2025-03-01T07:59:00Z", [e1.oid] ).raw, E_CAUSALITY, [e1], ) ) body_bad_ref = dict(c1_body, derived_from=[_FAKE_OP_ID]) vectors.append( _invalid( "invalid/ref-derived-from-missing", _signed(alice, "claim-assert", body_bad_ref, _ts(2), [e1.oid]).raw, E_REF, [e1], ) ) corr_bad_target = { "target": e1.oid, "reason": "targets evidence, not a claim", "new_confidence": 100, } vectors.append( _invalid( "invalid/ref-correction-target-type", _signed(alice, "correction", corr_bad_target, _ts(3), [e1.oid]).raw, E_REF, [e1], ) ) rev_bad_target = {"target": c1.oid, "reason": "targets a claim, not a grant"} vectors.append( _invalid( "invalid/ref-revocation-target-type", _signed(alice, "revocation", rev_bad_target, _ts(3), [c1.oid]).raw, E_REF, ctx(corr), ) ) vectors.append(_invalid("invalid/duplicate-op", e1.raw, E_DUP, [e1])) return vectors def write_suite(out_dir, vectors: Optional[List[dict]] = None) -> dict: """Write the suite to ``out_dir`` (one file per vector + manifest.json).""" if vectors is None: vectors = build_suite() out = Path(out_dir) entries = [] for vec in vectors: rel = vec["name"] + ".json" path = out / rel path.parent.mkdir(parents=True, exist_ok=True) data = (json.dumps(vec, indent=2, sort_keys=True) + "\n").encode("utf-8") path.write_bytes(data) entries.append( { "name": vec["name"], "path": rel, "kind": vec["kind"], "sha256": hashlib.sha256(data).hexdigest(), } ) manifest = {"format": VECTORS_FORMAT, "count": len(entries), "vectors": entries} (out / "manifest.json").write_bytes( (json.dumps(manifest, indent=2, sort_keys=True) + "\n").encode("utf-8") ) return manifest def load_suite(dir_path) -> List[dict]: """Load a vector suite from a directory (manifest-aware).""" root = Path(dir_path) manifest_path = root / "manifest.json" vectors: List[dict] = [] if manifest_path.exists(): manifest = json.loads(manifest_path.read_text(encoding="utf-8")) if manifest.get("format") != VECTORS_FORMAT: raise RuntimeError("unsupported vector format: %r" % manifest.get("format")) for entry in manifest["vectors"]: data = (root / entry["path"]).read_bytes() if hashlib.sha256(data).hexdigest() != entry["sha256"]: raise RuntimeError("vector file hash mismatch: %s" % entry["path"]) vectors.append(json.loads(data.decode("utf-8"))) else: for path in sorted(root.glob("**/*.json")): if path.name == "manifest.json": continue vectors.append(json.loads(path.read_text(encoding="utf-8"))) for vec in vectors: if vec.get("format") != VECTORS_FORMAT: raise RuntimeError("unsupported vector format in %r" % vec.get("name")) return vectors