"""The normative verification pipeline for a single operation. ``verify_op(raw, store)`` performs, in order: 1. strict parse (FP-E-JSON / FP-E-NUMBER) 2. canonical-form check (FP-E-CANONICAL) 3. envelope schema (FP-E-ENVELOPE), including calendar-validity of ``ts`` 4. body schema for the declared type (FP-E-BODY) 5. signature (FP-E-SIG) 6. if ``store`` is given, log-context checks: prev existence (FP-E-PREV), causality (FP-E-CAUSALITY), typed body references (FP-E-REF) It returns ``(op_id, envelope)``. Duplicate detection (FP-E-DUP) is the log's responsibility (see ``fpcf.log``). """ import re from datetime import datetime from typing import Mapping, Optional, Tuple from .canonical import check_canonical from .errors import ( FpcfError, E_ENVELOPE, E_PREV, E_CAUSALITY, E_REF, ) from .ids import op_id_for from .schemas import validate_envelope, validate_body from .signing import verify_signature _TS_RE = re.compile( r"^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,9}))?Z$" ) def ts_key(ts: str): """Total-order key for a wire timestamp; validates calendar correctness. Raises ``FP-E-ENVELOPE`` for syntactically or calendrically invalid timestamps (e.g. February 30th, hour 25). """ m = _TS_RE.match(ts) if isinstance(ts, str) else None if m is None: raise FpcfError(E_ENVELOPE, "invalid timestamp syntax: %r" % (ts,)) y, mo, d, h, mi, s, frac = m.groups() ns = int(frac.ljust(9, "0")) if frac else 0 try: dt = datetime(int(y), int(mo), int(d), int(h), int(mi), int(s)) except ValueError as exc: raise FpcfError(E_ENVELOPE, "invalid timestamp value: %s" % exc) return (dt, ns) # Allowed target/reference op types per body field, by op type. _REF_RULES = { "claim-assert": ( ("derived_from", {"evidence-ingest", "claim-assert"}), ), "correction": ( ("target", {"claim-assert"}), ), "refutation": ( ("target", {"claim-assert"}), ("evidence", {"evidence-ingest"}), ), "revocation": ( ("target", {"permission-grant"}), ), "inference-call": ( ("inputs", {"claim-assert"}), ("grant", {"permission-grant"}), ), } def _check_refs(envelope: dict, store: Mapping[str, dict]) -> None: op_type = envelope["type"] body = envelope["body"] for field, allowed in _REF_RULES.get(op_type, ()): value = body.get(field) if value is None: continue refs = value if isinstance(value, list) else [value] for ref in refs: target = store.get(ref) if target is None: raise FpcfError( E_REF, "%s.%s references %s, which is not in the log" % (op_type, field, ref), ) if target["type"] not in allowed: raise FpcfError( E_REF, "%s.%s references a %s op; allowed: %s" % (op_type, field, target["type"], ", ".join(sorted(allowed))), ) def _check_context(envelope: dict, store: Mapping[str, dict]) -> None: own_key = ts_key(envelope["ts"]) for prev_id in envelope["prev"]: prev = store.get(prev_id) if prev is None: raise FpcfError(E_PREV, "prev %s is not in the log" % prev_id) if ts_key(prev["ts"]) > own_key: raise FpcfError( E_CAUSALITY, "ts %s is earlier than prev op ts %s" % (envelope["ts"], prev["ts"]), ) _check_refs(envelope, store) def verify_op( raw: bytes, store: Optional[Mapping[str, dict]] = None, ) -> Tuple[str, dict]: """Run the full verification pipeline on wire bytes. ``store`` maps op id -> verified envelope for log-context checks; pass ``None`` to perform stateless verification only. """ envelope = check_canonical(raw) if not isinstance(envelope, dict): raise FpcfError(E_ENVELOPE, "top-level value must be an object") validate_envelope(envelope) ts_key(envelope["ts"]) # calendar validity beyond the schema pattern validate_body(envelope["type"], envelope["body"]) verify_signature(envelope) if store is not None: _check_context(envelope, store) return op_id_for(envelope), envelope