"""Key management for a PMP node. Each node identity is an Ed25519 keypair. The public key doubles as the node's identifier in operations (the ``author`` field) using the form:: ed25519: Private keys live in a plain-file keystore under the node directory (``/keys/.json``), written with mode ``0600``. This is the *reference* keystore: the spec deliberately allows implementations to substitute OS keychains, secure enclaves, or hardware tokens, as long as the ``author`` identifier and signature format stay the same. """ from __future__ import annotations import base64 import json import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.ed25519 import ( Ed25519PrivateKey, Ed25519PublicKey, ) from pmp.errors import KeyStoreError KEY_ID_PREFIX = "ed25519:" _KEYFILE_VERSION = 1 # --------------------------------------------------------------------------- # base64url helpers (unpadded), shared with operations.py # --------------------------------------------------------------------------- def b64u_encode(data: bytes) -> str: """Unpadded base64url encoding.""" return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def b64u_decode(text: str) -> bytes: """Decode unpadded base64url text.""" if not isinstance(text, str): raise ValueError("base64url value must be a string") padding = "=" * (-len(text) % 4) return base64.urlsafe_b64decode(text + padding) # --------------------------------------------------------------------------- # key identifiers # --------------------------------------------------------------------------- def key_id_from_public_bytes(public_bytes: bytes) -> str: if len(public_bytes) != 32: raise KeyStoreError("Ed25519 public key must be 32 bytes") return KEY_ID_PREFIX + b64u_encode(public_bytes) def public_key_from_key_id(key_id: str) -> Ed25519PublicKey: if not isinstance(key_id, str) or not key_id.startswith(KEY_ID_PREFIX): raise KeyStoreError(f"not an ed25519 key id: {key_id!r}") try: raw = b64u_decode(key_id[len(KEY_ID_PREFIX):]) if len(raw) != 32: raise ValueError("wrong length") return Ed25519PublicKey.from_public_bytes(raw) except Exception as exc: # noqa: BLE001 - normalize to KeyStoreError raise KeyStoreError(f"malformed key id {key_id!r}: {exc}") from exc def verify_signature(key_id: str, data: bytes, signature: bytes) -> bool: """Verify *signature* over *data* against the public key encoded in *key_id*.""" public_key = public_key_from_key_id(key_id) try: public_key.verify(signature, data) return True except InvalidSignature: return False # --------------------------------------------------------------------------- # keypair # --------------------------------------------------------------------------- @dataclass class KeyPair: """A named Ed25519 keypair held in memory.""" name: str private_key: Ed25519PrivateKey created_at: str @property def public_bytes(self) -> bytes: return self.private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw, ) @property def key_id(self) -> str: return key_id_from_public_bytes(self.public_bytes) def sign(self, data: bytes) -> bytes: return self.private_key.sign(data) def _private_raw(self) -> bytes: return self.private_key.private_bytes( encoding=serialization.Encoding.Raw, format=serialization.PrivateFormat.Raw, encryption_algorithm=serialization.NoEncryption(), ) # --------------------------------------------------------------------------- # keystore # --------------------------------------------------------------------------- class KeyStore: """Plain-file keystore: one JSON file per named keypair. File format (version 1):: { "v": 1, "name": "device", "key_id": "ed25519:...", "private_key": "", "created_at": "2025-01-01T00:00:00Z" } """ def __init__(self, path: Path | str): self.path = Path(path) self.path.mkdir(parents=True, exist_ok=True) try: os.chmod(self.path, 0o700) except OSError: pass # best effort; not all filesystems support POSIX modes # -- internal ---------------------------------------------------------- def _keyfile(self, name: str) -> Path: if not name or "/" in name or "\\" in name or name.startswith("."): raise KeyStoreError(f"invalid key name: {name!r}") return self.path / f"{name}.json" # -- public API -------------------------------------------------------- def exists(self, name: str) -> bool: return self._keyfile(name).exists() def create(self, name: str = "device") -> KeyPair: """Generate and persist a new keypair. Refuses to overwrite.""" keyfile = self._keyfile(name) if keyfile.exists(): raise KeyStoreError(f"key {name!r} already exists at {keyfile}") private_key = Ed25519PrivateKey.generate() created_at = ( datetime.now(timezone.utc) .isoformat(timespec="seconds") .replace("+00:00", "Z") ) pair = KeyPair(name=name, private_key=private_key, created_at=created_at) record = { "v": _KEYFILE_VERSION, "name": name, "key_id": pair.key_id, "private_key": b64u_encode(pair._private_raw()), "created_at": created_at, } keyfile.write_text(json.dumps(record, indent=2) + "\n", encoding="utf-8") try: os.chmod(keyfile, 0o600) except OSError: pass return pair def load(self, name: str = "device") -> KeyPair: keyfile = self._keyfile(name) if not keyfile.exists(): raise KeyStoreError(f"no key named {name!r} in {self.path}") try: record = json.loads(keyfile.read_text(encoding="utf-8")) if record.get("v") != _KEYFILE_VERSION: raise ValueError(f"unsupported keyfile version {record.get('v')!r}") raw = b64u_decode(record["private_key"]) private_key = Ed25519PrivateKey.from_private_bytes(raw) pair = KeyPair( name=record["name"], private_key=private_key, created_at=record.get("created_at", ""), ) except KeyStoreError: raise except Exception as exc: # noqa: BLE001 raise KeyStoreError(f"corrupt keyfile {keyfile}: {exc}") from exc if record.get("key_id") != pair.key_id: raise KeyStoreError( f"keyfile {keyfile} key_id does not match its private key" ) return pair def list_keys(self) -> list[dict]: """Public metadata for every stored key (never the private material).""" out = [] for keyfile in sorted(self.path.glob("*.json")): try: record = json.loads(keyfile.read_text(encoding="utf-8")) out.append( { "name": record.get("name", keyfile.stem), "key_id": record.get("key_id", ""), "created_at": record.get("created_at", ""), } ) except (OSError, ValueError): out.append({"name": keyfile.stem, "key_id": "", "created_at": "", "error": "unreadable keyfile"}) return out