"""Canonical serialization, hashing, YAML I/O, and time utilities. Everything that gets hashed or signed in the pipeline flows through :func:`canonical_json`: deterministic key ordering, compact separators, no NaN/Infinity, UTF-8. Two semantically identical objects always produce byte-identical serializations and therefore identical hashes. This module is dependency-light on purpose: every other govtool module imports it, and the audit ledger's integrity rests on these functions remaining stable. """ from __future__ import annotations import hashlib import json import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable, Union import yaml from .errors import CanonicalizationError, ValidationError # --------------------------------------------------------------------------- # Canonical JSON + hashing # --------------------------------------------------------------------------- ZERO_HASH = "0" * 64 _HEX_RE = re.compile(r"^[0-9a-f]+$") def canonical_json(obj: Any) -> str: """Serialize *obj* to canonical JSON. Rules: keys sorted, compact separators, no NaN/Infinity, unicode preserved (not escaped). Raises :class:`CanonicalizationError` for objects that are not pure JSON types. """ try: return json.dumps( obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False, allow_nan=False, ) except (TypeError, ValueError) as exc: raise CanonicalizationError(f"object is not canonically serializable: {exc}") from exc def sha256_hex(data: Union[bytes, str]) -> str: """SHA-256 of *data* as lowercase hex. Strings are encoded UTF-8.""" if isinstance(data, str): data = data.encode("utf-8") return hashlib.sha256(data).hexdigest() def hash_obj(obj: Any) -> str: """SHA-256 of the canonical JSON serialization of *obj*.""" return sha256_hex(canonical_json(obj)) # Backwards-compatible aliases (kept stable for external callers). canonical_hash = hash_obj def is_hex_digest(value: Any, length: int = 64) -> bool: """True if *value* is a lowercase hex string of the given length.""" return isinstance(value, str) and len(value) == length and bool(_HEX_RE.match(value)) # --------------------------------------------------------------------------- # YAML I/O # --------------------------------------------------------------------------- def load_yaml(path: Union[str, Path]) -> Any: """Load a YAML file with the safe loader. Empty files load as ``{}``.""" path = Path(path) try: with path.open("r", encoding="utf-8") as fh: data = yaml.safe_load(fh) except yaml.YAMLError as exc: raise ValidationError(f"invalid YAML in {path}: {exc}") from exc except OSError as exc: raise ValidationError(f"cannot read {path}: {exc}") from exc return {} if data is None else data def load_yaml_str(text: str, source: str = "") -> Any: """Parse a YAML string with the safe loader. Empty input parses as ``{}``.""" try: data = yaml.safe_load(text) except yaml.YAMLError as exc: raise ValidationError(f"invalid YAML in {source}: {exc}") from exc return {} if data is None else data def yaml_str(obj: Any) -> str: """Render *obj* as readable YAML (insertion-order keys, block style).""" return yaml.safe_dump( obj, sort_keys=False, allow_unicode=True, default_flow_style=False, width=100, ) def dump_yaml(path: Union[str, Path], obj: Any) -> None: """Write *obj* as YAML to *path*, creating parent directories.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as fh: fh.write(yaml_str(obj)) # Aliases kept for callers that used the *_file names. load_yaml_file = load_yaml dump_yaml_file = dump_yaml # --------------------------------------------------------------------------- # File / tree hashing # --------------------------------------------------------------------------- def file_sha256(path: Union[str, Path]) -> str: """SHA-256 of a file's raw bytes.""" h = hashlib.sha256() with Path(path).open("rb") as fh: for chunk in iter(lambda: fh.read(65536), b""): h.update(chunk) return h.hexdigest() def tree_hash(root: Union[str, Path], ignore_names: Iterable[str] = (".git", "__pycache__")) -> str: """Deterministic SHA-256 over an entire directory tree. Walks *root*, sorts files by POSIX relative path, and hashes the sequence ``"{relpath}\\n{file_sha256}\\n"``. Two trees with identical content at identical paths produce identical hashes regardless of filesystem, mtime, or walk order. Hidden VCS/cache directories are skipped. """ root = Path(root) if not root.is_dir(): raise ValidationError(f"tree_hash: {root} is not a directory") ignore = set(ignore_names) entries: list[tuple[str, str]] = [] for dirpath, dirnames, filenames in os.walk(root): dirnames[:] = sorted(d for d in dirnames if d not in ignore) for name in sorted(filenames): full = Path(dirpath) / name rel = full.relative_to(root).as_posix() entries.append((rel, file_sha256(full))) h = hashlib.sha256() for rel, digest in entries: h.update(f"{rel}\n{digest}\n".encode("utf-8")) return h.hexdigest() # --------------------------------------------------------------------------- # Time # --------------------------------------------------------------------------- def utcnow() -> datetime: """The current time as a timezone-aware UTC datetime.""" return datetime.now(timezone.utc) def utcnow_iso() -> str: """The current UTC time as an ISO-8601 string with a ``Z`` suffix, second precision (sub-second jitter has no governance meaning).""" return utcnow().replace(microsecond=0).isoformat().replace("+00:00", "Z") def iso(dt: datetime) -> str: """Render an aware datetime as ISO-8601 UTC with a ``Z`` suffix.""" if dt.tzinfo is None: raise ValidationError("naive datetimes are not allowed in governance records") return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def parse_iso(value: str) -> datetime: """Parse an ISO-8601 timestamp into an aware UTC datetime. Accepts both ``Z`` and explicit offsets. Naive timestamps are rejected: every timestamp in the ledger must be unambiguous. """ if not isinstance(value, str): raise ValidationError(f"timestamp must be a string, got {type(value).__name__}") text = value.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" try: dt = datetime.fromisoformat(text) except ValueError as exc: raise ValidationError(f"invalid ISO-8601 timestamp {value!r}: {exc}") from exc if dt.tzinfo is None: raise ValidationError(f"timestamp {value!r} is naive; an explicit UTC offset is required") return dt.astimezone(timezone.utc)