"""FablePool wire format v1. Canonical encoding ------------------ Operations are encoded as canonical JSON: UTF-8, lexicographically sorted keys, no insignificant whitespace, ``ensure_ascii=False``, NaN/Infinity forbidden. Hashes are SHA-256 over the canonical bytes, rendered as lowercase hex. This matches the wire format fixed in milestone #2 and is what another implementation must reproduce byte-for-byte to interoperate. Operation envelope ------------------ :: { "v": 1, "type": "", "author": "ed25519:<64 hex chars of public key>", "ts": "", "prev": "", "body": { ...type-specific... }, "sig": "", "op_id": "op_" } The signing payload is the canonical encoding of the envelope restricted to the keys ``v, type, author, ts, prev, body``. ``op_id`` commits to the signature as well, so two distinct signatures over identical payloads yield distinct operations. """ from __future__ import annotations import hashlib import json import numbers import re from datetime import datetime, timezone from typing import Any, Dict, Iterable, Optional from .errors import WireError WIRE_VERSION = 1 SIGNING_KEYS = ("v", "type", "author", "ts", "prev", "body") ENVELOPE_KEYS = SIGNING_KEYS + ("sig", "op_id") TOPIC_RE = re.compile(r"^[a-z0-9_]+(\.[a-z0-9_]+)*$") AUTHOR_RE = re.compile(r"^ed25519:[0-9a-f]{64}$") OP_ID_RE = re.compile(r"^op_[0-9a-f]{64}$") TS_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?Z$") # --------------------------------------------------------------------------- # Canonical encoding # --------------------------------------------------------------------------- def canonical_bytes(obj: Any) -> bytes: """Encode *obj* as canonical JSON bytes (sorted keys, compact, UTF-8).""" try: text = json.dumps( obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False, allow_nan=False, ) except (TypeError, ValueError) as exc: raise WireError(f"object is not canonically encodable: {exc}") from exc return text.encode("utf-8") def sha256_hex(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def now_ts() -> str: """Current UTC time as an RFC 3339 string with 'Z' suffix.""" return ( datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) # --------------------------------------------------------------------------- # Operation type schemas # --------------------------------------------------------------------------- # Each schema: required field name -> validator, plus a set of optional fields. # Validators raise WireError with a precise message. def _is_str(name: str, v: Any) -> None: if not isinstance(v, str) or not v: raise WireError(f"field {name!r} must be a non-empty string") def _is_dict(name: str, v: Any) -> None: if not isinstance(v, dict): raise WireError(f"field {name!r} must be an object") def _is_str_list(name: str, v: Any) -> None: if not isinstance(v, list) or not all(isinstance(x, str) and x for x in v): raise WireError(f"field {name!r} must be a list of non-empty strings") def _is_topic(name: str, v: Any) -> None: _is_str(name, v) if not TOPIC_RE.match(v): raise WireError( f"field {name!r} must be a dotted lowercase topic path " f"(e.g. 'routine.weekly'), got {v!r}" ) def _is_confidence(name: str, v: Any) -> None: if isinstance(v, bool) or not isinstance(v, numbers.Real): raise WireError(f"field {name!r} must be a number in [0, 1]") if not (0.0 <= float(v) <= 1.0): raise WireError(f"field {name!r} must be in [0, 1], got {v}") def _is_ts(name: str, v: Any) -> None: _is_str(name, v) if not TS_RE.match(v): raise WireError( f"field {name!r} must be an RFC 3339 UTC timestamp ending in 'Z', " f"got {v!r}" ) def _any(name: str, v: Any) -> None: # any JSON value, presence is enough return None def _topic_list(name: str, v: Any) -> None: _is_str_list(name, v) for t in v: if not TOPIC_RE.match(t): raise WireError(f"field {name!r} contains invalid topic {t!r}") OP_SCHEMAS: Dict[str, Dict[str, Any]] = { "evidence.add": { "required": { "source": _is_str, # adapter name, e.g. "calendar.ics" "kind": _is_str, # e.g. "calendar_event", "note", "photo_meta" "content": _is_dict, # adapter-specific structured payload "observed_at": _is_ts, # when the underlying fact was observed }, "optional": {"tags": _is_str_list}, }, "claim.assert": { "required": { "subject": _is_str, # "self" in the single-user reference node "topic": _is_topic, "statement": _is_str, # human-readable claim text "confidence": _is_confidence, "method": _is_str, # derivation rule id or "user.correction" "inputs": _is_str_list, # ids of claims/evidence this derives from }, "optional": {"value": _any, "params": _is_dict}, }, "claim.refute": { "required": {"claim_id": _is_str, "reason": _is_str}, "optional": {"note": _is_str}, }, "claim.correct": { "required": {"claim_id": _is_str, "statement": _is_str}, "optional": {"value": _any, "reason": _is_str}, }, "capability.grant": { "required": {"grantee": _is_str, "topics": _topic_list}, "optional": {"expires_at": _is_ts, "note": _is_str}, }, "capability.revoke": { "required": {"capability_id": _is_str}, "optional": {"reason": _is_str}, }, "inference.call": { "required": { "model": _is_str, "purpose": _is_str, "input_ids": _is_str_list, }, "optional": {"output_ids": _is_str_list, "prompt_hash": _is_str}, }, } OP_TYPES = tuple(sorted(OP_SCHEMAS)) def validate_body(op_type: str, body: Any) -> None: """Validate a body against its operation schema. Raises WireError.""" if op_type not in OP_SCHEMAS: raise WireError(f"unknown operation type {op_type!r}") if not isinstance(body, dict): raise WireError("body must be an object") schema = OP_SCHEMAS[op_type] required = schema["required"] optional = schema["optional"] for name, check in required.items(): if name not in body: raise WireError(f"{op_type}: missing required field {name!r}") check(name, body[name]) for name, value in body.items(): if name in required: continue if name not in optional: raise WireError(f"{op_type}: unknown field {name!r}") optional[name](name, value) # --------------------------------------------------------------------------- # Envelope construction and structural validation # --------------------------------------------------------------------------- def signing_payload(envelope: Dict[str, Any]) -> Dict[str, Any]: return {k: envelope[k] for k in SIGNING_KEYS} def signing_bytes(envelope: Dict[str, Any]) -> bytes: return canonical_bytes(signing_payload(envelope)) def compute_op_id(envelope: Dict[str, Any]) -> str: """op_id commits to the signed payload *and* the signature.""" if "sig" not in envelope: raise WireError("cannot compute op_id before the envelope is signed") committed = dict(signing_payload(envelope)) committed["sig"] = envelope["sig"] return "op_" + sha256_hex(canonical_bytes(committed)) def build_envelope( op_type: str, body: Dict[str, Any], author: str, prev: Optional[str], sign_fn, ts: Optional[str] = None, ) -> Dict[str, Any]: """Build, sign and id a complete operation envelope. ``sign_fn`` takes the signing bytes and returns a base64 signature string; it is supplied by :class:`fablepool.identity.Identity`. """ validate_body(op_type, body) if not AUTHOR_RE.match(author): raise WireError(f"malformed author identifier {author!r}") if prev is not None and not OP_ID_RE.match(prev): raise WireError(f"malformed prev reference {prev!r}") envelope: Dict[str, Any] = { "v": WIRE_VERSION, "type": op_type, "author": author, "ts": ts or now_ts(), "prev": prev, "body": body, } envelope["sig"] = sign_fn(signing_bytes(envelope)) envelope["op_id"] = compute_op_id(envelope) return envelope def validate_envelope_structure(envelope: Any) -> None: """Structural validation of a decoded envelope (no signature check).""" if not isinstance(envelope, dict): raise WireError("envelope must be an object") missing = [k for k in ENVELOPE_KEYS if k not in envelope] if missing: raise WireError(f"envelope missing fields: {', '.join(missing)}") extra = [k for k in envelope if k not in ENVELOPE_KEYS] if extra: raise WireError(f"envelope has unknown fields: {', '.join(sorted(extra))}") if envelope["v"] != WIRE_VERSION: raise WireError(f"unsupported wire version {envelope['v']!r}") if not AUTHOR_RE.match(str(envelope["author"])): raise WireError(f"malformed author {envelope['author']!r}") _is_ts("ts", envelope["ts"]) if envelope["prev"] is not None and not OP_ID_RE.match(str(envelope["prev"])): raise WireError(f"malformed prev {envelope['prev']!r}") if not isinstance(envelope["sig"], str) or not envelope["sig"]: raise WireError("sig must be a non-empty string") if not OP_ID_RE.match(str(envelope["op_id"])): raise WireError(f"malformed op_id {envelope['op_id']!r}") validate_body(envelope["type"], envelope["body"]) recomputed = compute_op_id(envelope) if recomputed != envelope["op_id"]: raise WireError( f"op_id mismatch: envelope says {envelope['op_id']}, " f"canonical content hashes to {recomputed}" ) def derived_id(prefix: str, op_id: str) -> str: """Type-distinct object id derived from an op_id. Evidence created by op ``op_`` gets id ``ev_``; claims get ``cl_``; capabilities get ``cap_``. The hex part is shared so a user can paste either form anywhere an identifier is accepted. """ if not OP_ID_RE.match(op_id): raise WireError(f"malformed op_id {op_id!r}") return f"{prefix}_{op_id[3:]}" def iter_jsonl(text: str) -> Iterable[Any]: """Yield decoded JSON objects from JSONL text, skipping blank lines.""" for line in text.splitlines(): line = line.strip() if line: yield json.loads(line)