"""Embedded JSON Schemas for the envelope and the seven operation bodies. These dictionaries mirror the normative schema files under ``spec/02-wire-format/schemas/``. If the two ever diverge, the spec files win and this module must be fixed. Validation uses JSON Schema draft 2020-12 via the ``jsonschema`` package. """ from jsonschema import Draft202012Validator from .errors import FpcfError, E_ENVELOPE, E_BODY from .ids import ( OP_ID_PATTERN, PUBKEY_PATTERN, SIG_PATTERN, CONTENT_HASH_PATTERN, ) TS_PATTERN = ( r"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}" r"(\.[0-9]{1,9})?Z$" ) PREDICATE_PATTERN = r"^[a-z0-9_]+(\.[a-z0-9_]+)+$" SUBJECT_PATTERN = r"^fp:(self|actor:[0-9a-f]{64})$" SCOPE_PREDICATE_PATTERN = r"^([a-z0-9_]+(\.[a-z0-9_]+)*(\.\*)?|\*)$" CALLER_PATTERN = r"^[a-z0-9][a-z0-9._:-]{0,127}$" OP_TYPES = ( "evidence-ingest", "claim-assert", "correction", "refutation", "permission-grant", "revocation", "inference-call", ) _OP_ID = {"type": "string", "pattern": OP_ID_PATTERN} _TS = {"type": "string", "pattern": TS_PATTERN} _CONTENT_HASH = {"type": "string", "pattern": CONTENT_HASH_PATTERN} _CONFIDENCE = {"type": "integer", "minimum": 0, "maximum": 10000} _OBJECT = { "type": "object", "additionalProperties": False, "required": ["value"], "properties": { "value": {}, "unit": {"type": "string", "minLength": 1, "maxLength": 64}, "lang": {"type": "string", "minLength": 2, "maxLength": 16}, }, } ENVELOPE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf operation envelope", "type": "object", "additionalProperties": False, "required": ["v", "type", "author", "ts", "prev", "body", "sig"], "properties": { "v": {"const": 1}, "type": {"enum": list(OP_TYPES)}, "author": {"type": "string", "pattern": PUBKEY_PATTERN}, "ts": _TS, "prev": { "type": "array", "maxItems": 16, "uniqueItems": True, "items": _OP_ID, }, "body": {"type": "object"}, "sig": {"type": "string", "pattern": SIG_PATTERN}, }, } BODY_SCHEMAS = { "evidence-ingest": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: evidence-ingest", "type": "object", "additionalProperties": False, "required": ["source", "media_type", "content_hash", "captured_at"], "properties": { "source": { "type": "object", "additionalProperties": False, "required": ["adapter"], "properties": { "adapter": {"type": "string", "minLength": 1, "maxLength": 128}, "locator": {"type": "string", "minLength": 1, "maxLength": 1024}, }, }, "media_type": { "type": "string", "pattern": r"^[a-z0-9!#$&^_.+-]+/[a-z0-9!#$&^_.+-]+$", "maxLength": 128, }, "content_hash": _CONTENT_HASH, "size": {"type": "integer", "minimum": 0}, "captured_at": _TS, "attrs": {"type": "object"}, }, }, "claim-assert": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: claim-assert", "type": "object", "additionalProperties": False, "required": ["subject", "predicate", "object", "derived_from", "confidence", "method"], "properties": { "subject": {"type": "string", "pattern": SUBJECT_PATTERN}, "predicate": {"type": "string", "pattern": PREDICATE_PATTERN, "maxLength": 256}, "object": _OBJECT, "derived_from": { "type": "array", "minItems": 1, "maxItems": 64, "uniqueItems": True, "items": _OP_ID, }, "confidence": _CONFIDENCE, "method": {"type": "string", "minLength": 1, "maxLength": 256}, "valid_from": _TS, "valid_to": _TS, }, }, "correction": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: correction", "type": "object", "additionalProperties": False, "required": ["target", "reason"], "anyOf": [ {"required": ["new_object"]}, {"required": ["new_confidence"]}, ], "properties": { "target": _OP_ID, "reason": {"type": "string", "minLength": 1, "maxLength": 2048}, "new_object": _OBJECT, "new_confidence": _CONFIDENCE, }, }, "refutation": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: refutation", "type": "object", "additionalProperties": False, "required": ["target", "reason"], "properties": { "target": _OP_ID, "reason": {"type": "string", "minLength": 1, "maxLength": 2048}, "evidence": { "type": "array", "maxItems": 64, "uniqueItems": True, "items": _OP_ID, }, }, }, "permission-grant": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: permission-grant", "type": "object", "additionalProperties": False, "required": ["grantee", "scope"], "properties": { "grantee": {"type": "string", "pattern": PUBKEY_PATTERN}, "scope": { "type": "object", "additionalProperties": False, "required": ["predicates"], "properties": { "predicates": { "type": "array", "minItems": 1, "maxItems": 64, "uniqueItems": True, "items": { "type": "string", "pattern": SCOPE_PREDICATE_PATTERN, "maxLength": 256, }, }, "subjects": { "type": "array", "minItems": 1, "maxItems": 64, "uniqueItems": True, "items": {"type": "string", "pattern": SUBJECT_PATTERN}, }, "min_confidence": _CONFIDENCE, "include_provenance": {"type": "boolean"}, }, }, "expires_at": _TS, }, }, "revocation": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: revocation", "type": "object", "additionalProperties": False, "required": ["target"], "properties": { "target": _OP_ID, "reason": {"type": "string", "minLength": 1, "maxLength": 2048}, }, }, "inference-call": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "fpcf body: inference-call", "type": "object", "additionalProperties": False, "required": ["caller", "query", "inputs"], "properties": { "caller": {"type": "string", "pattern": CALLER_PATTERN}, "query": {"type": "string", "minLength": 1, "maxLength": 4096}, "inputs": { "type": "array", "maxItems": 256, "uniqueItems": True, "items": _OP_ID, }, "grant": _OP_ID, "output_hash": _CONTENT_HASH, "model": {"type": "string", "minLength": 1, "maxLength": 256}, }, }, } _ENVELOPE_VALIDATOR = Draft202012Validator(ENVELOPE_SCHEMA) _BODY_VALIDATORS = {t: Draft202012Validator(s) for t, s in BODY_SCHEMAS.items()} def _first_error(validator, instance): errs = sorted( validator.iter_errors(instance), key=lambda e: (list(map(str, e.absolute_path)), e.message), ) return errs[0] if errs else None def _format_error(err) -> str: path = "/".join(map(str, err.absolute_path)) or "" return "%s: %s" % (path, err.message) def validate_envelope(envelope) -> None: """Raise ``FP-E-ENVELOPE`` if the envelope violates the envelope schema.""" err = _first_error(_ENVELOPE_VALIDATOR, envelope) if err is not None: raise FpcfError(E_ENVELOPE, _format_error(err)) def validate_body(op_type: str, body) -> None: """Raise ``FP-E-BODY`` if ``body`` violates the schema for ``op_type``.""" validator = _BODY_VALIDATORS.get(op_type) if validator is None: raise FpcfError(E_ENVELOPE, "unknown operation type: %r" % op_type) err = _first_error(validator, body) if err is not None: raise FpcfError(E_BODY, _format_error(err))