"""Canonical JSON serialization for the FablePool wire format. This is a deliberately restricted profile of RFC 8785 (JCS): * UTF-8 bytes, no whitespace, ``,`` and ``:`` separators only. * Object keys sorted by UTF-16 code units (RFC 8785 ordering). * Strings escaped minimally: ``\\"``, ``\\\\``, ``\\b``, ``\\f``, ``\\n``, ``\\r``, ``\\t``; other control characters as ``\\u00xx`` (lowercase hex); all other characters emitted literally as UTF-8. * Numbers MUST be integers with absolute value <= 2^53-1. Floats, NaN, and Infinity are forbidden on the wire (``FP-E-NUMBER``). * Duplicate object keys are forbidden (``FP-E-JSON``). * Nesting depth is limited to 64. See spec/02-wire-format/01-encoding-and-identifiers.md. """ import json from .errors import FpcfError, E_JSON, E_NUMBER, E_CANONICAL MAX_SAFE_INT = 2**53 - 1 MAX_DEPTH = 64 _ESCAPES = { '"': '\\"', "\\": "\\\\", "\b": "\\b", "\f": "\\f", "\n": "\\n", "\r": "\\r", "\t": "\\t", } def _enc_string(s: str) -> str: out = ['"'] for ch in s: esc = _ESCAPES.get(ch) if esc is not None: out.append(esc) elif ord(ch) < 0x20: out.append("\\u%04x" % ord(ch)) else: out.append(ch) out.append('"') return "".join(out) def _key_order(key: str) -> bytes: # Byte-wise comparison of UTF-16BE encodings equals comparison by # UTF-16 code units, which is the RFC 8785 ordering. return key.encode("utf-16-be") def _enc(value, depth: int) -> str: if depth > MAX_DEPTH: raise FpcfError(E_CANONICAL, "nesting depth exceeds %d" % MAX_DEPTH) if value is None: return "null" if value is True: return "true" if value is False: return "false" if isinstance(value, bool): # pragma: no cover - covered above raise FpcfError(E_CANONICAL, "unreachable bool") if isinstance(value, int): if abs(value) > MAX_SAFE_INT: raise FpcfError(E_NUMBER, "integer out of safe range: %d" % value) return str(value) if isinstance(value, float): raise FpcfError(E_NUMBER, "non-integer numbers are forbidden on the wire") if isinstance(value, str): return _enc_string(value) if isinstance(value, (list, tuple)): return "[" + ",".join(_enc(v, depth + 1) for v in value) + "]" if isinstance(value, dict): for k in value: if not isinstance(k, str): raise FpcfError(E_CANONICAL, "object keys must be strings") items = sorted(value.items(), key=lambda kv: _key_order(kv[0])) return "{" + ",".join( _enc_string(k) + ":" + _enc(v, depth + 1) for k, v in items ) + "}" raise FpcfError(E_CANONICAL, "unsupported value type: %s" % type(value).__name__) def canonicalize(obj) -> bytes: """Serialize ``obj`` to canonical wire bytes (raises FpcfError).""" return _enc(obj, 0).encode("utf-8") def _no_float(text: str): raise FpcfError(E_NUMBER, "non-integer number on the wire: %s" % text) def _no_constant(text: str): raise FpcfError(E_NUMBER, "forbidden JSON constant: %s" % text) def _reject_dup_pairs(pairs): seen = set() out = {} for k, v in pairs: if k in seen: raise FpcfError(E_JSON, "duplicate object key: %r" % k) seen.add(k) out[k] = v return out def loads_strict(raw: bytes): """Parse wire bytes strictly. Raises ``FP-E-JSON`` for malformed UTF-8/JSON or duplicate keys, and ``FP-E-NUMBER`` for floats, NaN, or Infinity. """ if not isinstance(raw, (bytes, bytearray)): raise TypeError("loads_strict expects bytes") try: text = bytes(raw).decode("utf-8", errors="strict") except UnicodeDecodeError as exc: raise FpcfError(E_JSON, "input is not valid UTF-8: %s" % exc) try: return json.loads( text, parse_float=_no_float, parse_constant=_no_constant, object_pairs_hook=_reject_dup_pairs, ) except FpcfError: raise except (ValueError, RecursionError) as exc: raise FpcfError(E_JSON, "input is not valid JSON: %s" % exc) def check_canonical(raw: bytes): """Parse ``raw`` and verify the bytes are in canonical form. Returns the parsed value, or raises FP-E-JSON / FP-E-NUMBER / FP-E-CANONICAL. """ obj = loads_strict(raw) if canonicalize(obj) != bytes(raw): raise FpcfError(E_CANONICAL, "bytes are not in canonical form") return obj