"""Canonical encoding for the FablePool wire format (FP/1). Every signature and every content address in FablePool is computed over the *canonical JSON encoding* of a value. Two independent implementations must produce byte-identical encodings for the same logical value, so the rules are deliberately strict: 1. Only the following types are permitted: ``null``, ``true``/``false``, integers, strings, arrays, and objects with string keys. 2. **Floats are forbidden.** Quantities that would naturally be fractional (e.g. claim confidence) are carried as integers in fixed units (confidence is in basis points, 0..10000). 3. Integers must fit in the IEEE-754 exact-integer range (|n| <= 2**53 - 1) so JavaScript implementations can interoperate. 4. Object keys are sorted by Unicode code point. 5. No insignificant whitespace; separators are ``,`` and ``:``. 6. Strings are emitted as UTF-8 without ``\\uXXXX`` escaping of non-ASCII characters (i.e. ``ensure_ascii=False`` semantics), with only the JSON mandatory escapes applied. Content addresses are ``"sha256:" + hex(sha256(canonical_bytes))``. """ from __future__ import annotations import hashlib import json from typing import Any MAX_INT = 2**53 - 1 MIN_INT = -(2**53 - 1) class CanonicalizationError(ValueError): """Raised when a value cannot be canonically encoded under FP/1 rules.""" def _check(value: Any, path: str) -> None: if value is None or value is True or value is False: return if isinstance(value, bool): # pragma: no cover - covered by identity checks return if isinstance(value, int): if not (MIN_INT <= value <= MAX_INT): raise CanonicalizationError( f"integer out of FP/1 range at {path}: {value}" ) return if isinstance(value, float): raise CanonicalizationError( f"floats are forbidden in FP/1 canonical form at {path}; " f"use fixed-point integers (got {value!r})" ) if isinstance(value, str): return if isinstance(value, list): for index, item in enumerate(value): _check(item, f"{path}[{index}]") return if isinstance(value, dict): for key, item in value.items(): if not isinstance(key, str): raise CanonicalizationError( f"object keys must be strings at {path} (got {key!r})" ) _check(item, f"{path}.{key}") return raise CanonicalizationError( f"type {type(value).__name__} is not encodable in FP/1 at {path}" ) def canonical_json(value: Any) -> bytes: """Encode ``value`` into FP/1 canonical bytes, or raise. The output is deterministic: same logical value, same bytes, on every conforming implementation. """ _check(value, "$") return json.dumps( value, sort_keys=True, separators=(",", ":"), ensure_ascii=False, allow_nan=False, ).encode("utf-8") def sha256_hex(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def content_address(value: Any) -> str: """Content-address a canonical value: ``sha256:``.""" return "sha256:" + sha256_hex(canonical_json(value)) def is_content_address(text: Any) -> bool: """Structural check for a ``sha256:<64 lowercase hex>`` address.""" if not isinstance(text, str) or not text.startswith("sha256:"): return False digest = text[len("sha256:"):] if len(digest) != 64: return False return all(c in "0123456789abcdef" for c in digest)