"""Wire transports for the FablePool sync protocol. Two transports ship with the reference node: * :class:`InMemoryTransport` — connects two ``Node`` objects in a single process. Messages are round-tripped through JSON so anything that would not survive the real wire fails here too. Used by tests and the demo. * :class:`HTTPTransport` / :class:`SyncHTTPServer` — a stdlib-only HTTP binding. Every protocol message is POSTed as a JSON document to ``/fpl/v1/sync`` and the reply is the JSON response message. There is no per-message-type routing on purpose: the message envelope (its ``type`` field) is the protocol, the transport is dumb. Second implementations only need "POST JSON, read JSON" to interoperate (see docs/INTEROP.md). Security note: this reference HTTP binding is plaintext and unauthenticated at the transport layer. Every operation crossing it is independently signed and verified, so a network attacker cannot *forge* state, but they can read it. Production deployments must wrap this in TLS or an authenticated tunnel; this is called out in docs/SECURITY.md. """ from __future__ import annotations import json import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Callable, Dict, Optional from urllib import error as urlerror from urllib import request as urlrequest SYNC_PATH = "/fpl/v1/sync" Handler = Callable[[dict], dict] class TransportError(Exception): """Raised when a transport cannot deliver a message or parse a reply.""" class Transport: """Abstract request/response transport for protocol messages.""" def request(self, message: dict) -> dict: # pragma: no cover - interface raise NotImplementedError class InMemoryTransport(Transport): """Connects directly to a peer's message handler, with wire semantics.""" def __init__(self, handler: Handler) -> None: self._handler = handler def request(self, message: dict) -> dict: try: wire = json.dumps(message) except (TypeError, ValueError) as exc: raise TransportError(f"message is not JSON-serializable: {exc}") from exc reply = self._handler(json.loads(wire)) try: return json.loads(json.dumps(reply)) except (TypeError, ValueError) as exc: raise TransportError(f"reply is not JSON-serializable: {exc}") from exc class HTTPTransport(Transport): """Client side of the HTTP binding (stdlib urllib only).""" def __init__(self, base_url: str, timeout: float = 15.0) -> None: self._url = base_url.rstrip("/") + SYNC_PATH self.timeout = timeout def request(self, message: dict) -> dict: data = json.dumps(message).encode("utf-8") req = urlrequest.Request( self._url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urlrequest.urlopen(req, timeout=self.timeout) as resp: body = resp.read() except urlerror.URLError as exc: raise TransportError(f"request to {self._url} failed: {exc}") from exc try: return json.loads(body.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: raise TransportError(f"peer returned invalid JSON: {exc}") from exc class SyncHTTPServer: """Server side of the HTTP binding. Wraps a message handler (typically ``Node.handle``) in a threaded HTTP server. ``port=0`` picks an ephemeral port; read :attr:`base_url` after construction. Usable as a context manager. """ def __init__(self, handler: Handler, host: str = "127.0.0.1", port: int = 0) -> None: outer_handler = handler class _RequestHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def do_POST(self) -> None: # noqa: N802 (stdlib API) if self.path != SYNC_PATH: self._reply(404, {"type": "error", "error": "unknown path"}) return try: length = int(self.headers.get("Content-Length", "0")) except ValueError: length = 0 raw = self.rfile.read(length) if length > 0 else b"" try: message = json.loads(raw.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: self._reply(400, {"type": "error", "error": f"bad JSON: {exc}"}) return try: reply = outer_handler(message) except Exception as exc: # defensive: handler must not kill server reply = {"type": "error", "error": f"{type(exc).__name__}: {exc}"} self._reply(200, reply) def _reply(self, status: int, payload: Dict) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, fmt: str, *args: object) -> None: pass # keep demo/test output clean self._server = ThreadingHTTPServer((host, port), _RequestHandler) self._server.daemon_threads = True self._thread: Optional[threading.Thread] = None @property def base_url(self) -> str: host, port = self._server.server_address[:2] return f"http://{host}:{port}" def start(self) -> "SyncHTTPServer": if self._thread is not None: return self self._thread = threading.Thread( target=self._server.serve_forever, name="fpl-sync-http", daemon=True ) self._thread.start() return self def stop(self) -> None: self._server.shutdown() self._server.server_close() if self._thread is not None: self._thread.join(timeout=5) self._thread = None def __enter__(self) -> "SyncHTTPServer": return self.start() def __exit__(self, *exc_info: object) -> None: self.stop()