"""Shared helpers for the FablePool test suite. These helpers deliberately tolerate two common API styles (functions that return ``None`` on success and raise on failure vs. functions that return a bool; ingest functions that return ids vs. full op dicts) so the assertions in the tests stay focused on protocol behavior. """ from __future__ import annotations import json from pathlib import Path from fablepool import keys as fp_keys from fablepool import ops as fp_ops ROOT = Path(__file__).resolve().parents[1] DATASETS = ROOT / "datasets" def as_hex(value): if isinstance(value, (bytes, bytearray)): return bytes(value).hex() return str(value) def as_ids(ret): """Normalise an ingest/derive return value to a list of op id strings.""" out = [] for item in ret: if isinstance(item, dict): out.append(item["id"]) else: out.append(str(item)) return out def verifies(op): """True if the signed operation passes integrity verification.""" try: result = fp_ops.verify_op(op) except Exception: return False return True if result is None else bool(result) def verify_ok(pub, sig, msg): """True if the raw signature verifies under the public key.""" try: result = fp_keys.verify(as_hex(pub), as_hex(sig), msg) except Exception: return False return True if result is None else bool(result) def op_index(node): """Map of op id -> op dict for everything in the node's store.""" return {op["id"]: op for op in node.store.all_ops()} def all_op_ids(node): return set(op_index(node)) def get_all_claims(node): """All claims including invalidated/refuted ones, regardless of the default filtering behavior of ``Node.claims``.""" try: return list(node.claims(include_invalidated=True)) except TypeError: return list(node.claims()) def status_of(node, claim_id): """The status of a claim; 'absent' if it is no longer listed (which the tests treat as not-active).""" for claim in get_all_claims(node): if claim["id"] == claim_id: return claim.get("status", "active") return "absent" def load_everything(node): """Ingest all sample datasets into one node and run derivation.""" node.ingest_calendar(DATASETS / "phone" / "calendar.ics") node.ingest_photos(DATASETS / "phone" / "photos.json") node.ingest_notes(DATASETS / "laptop" / "notes") node.derive() return node def claim_dependents(claims, idx): """Map claim-op-id -> [claim ids that list it as a source].""" deps = {} for claim in claims: for source in claim.get("sources", []): source_op = idx.get(source) if source_op is not None and source_op.get("type") == "claim": deps.setdefault(source, []).append(claim["id"]) return deps def closure(deps, root): """Transitive set of claim ids downstream of ``root``.""" seen = set() stack = [root] while stack: current = stack.pop() for dependent in deps.get(current, []): if dependent not in seen: seen.add(dependent) stack.append(dependent) return seen def long_strings(value, min_len=40): """Collect every string of at least ``min_len`` characters nested inside ``value``. Used to check that raw evidence bodies never leak to a delegate node.""" found = set() def walk(v): if isinstance(v, str): if len(v) >= min_len: found.add(v) elif isinstance(v, dict): for child in v.values(): walk(child) elif isinstance(v, (list, tuple)): for child in v: walk(child) walk(value) return found def dump_ops(node): """A JSON dump of every op a node holds, for substring leak checks.""" return json.dumps(list(node.store.all_ops()), default=str, sort_keys=True)