"""Log synchronization between FablePool nodes (protocol ``fpl-sync/1``). The protocol is a small request/response message vocabulary carried over any :class:`fablepool.transport.Transport`: ================ ===================================================== client message server reply ================ ===================================================== hello hello {protocol, node, name} frontier frontier {op_ids, lamport} ops_request ops {ops: [envelope...]} (bounded batches) push push_result {accepted, rejected, deferred} delegate_pull delegate_ops {...} (see fablepool.capability) purge_receipt receipt_ack {...} (see fablepool.capability) holdings_request holdings {proof} (see fablepool.capability) ================ ===================================================== Conflict handling ----------------- The log itself never conflicts: it is an append-only set of content-addressed, signed operations, so a sync merges two sets. Every incoming operation is verified (hash identity + signature) before it is admitted, and operations are ingested in dependency order — an operation whose ``prev`` references are not yet satisfiable is *deferred*, never half-applied. Semantic conflicts (two nodes asserting different objects for the same subject/predicate while offline) surface after the merge and are resolved deterministically by the claim layer (:mod:`fablepool.claims`); the resolution is reported in the :class:`SyncReport` so the UI can show the user what happened. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Callable, Dict, List, Optional, Sequence, Set from . import capability from .ops import Operation, op_from_dict, verify_op from .transport import Transport PROTOCOL = "fpl-sync/1" BATCH_SIZE = 200 MAX_WANT = 1000 class SyncError(Exception): """Raised when a peer misbehaves or speaks the wrong protocol.""" @dataclass class IngestResult: accepted: List[str] = field(default_factory=list) rejected: List[dict] = field(default_factory=list) deferred: List[str] = field(default_factory=list) def extend(self, other: "IngestResult") -> None: self.accepted.extend(other.accepted) self.rejected.extend(other.rejected) self.deferred.extend(other.deferred) def ingest_ops( store, op_dicts: Sequence[dict], observe: Optional[Callable[[Operation], None]] = None, ) -> IngestResult: """Verify and append a batch of operation envelopes. * Envelopes that fail parsing, hash identity, or signature verification are rejected with a reason (and never touch the store). * Valid operations are appended in dependency order; an operation whose ``prev`` references cannot be satisfied by the store or the batch is deferred (reported, not stored). """ result = IngestResult() parsed: List[Operation] = [] for d in op_dicts: try: op = op_from_dict(d) except Exception as exc: op_id = d.get("op_id", "") if isinstance(d, dict) else "" result.rejected.append({"op": op_id, "reason": f"parse error: {exc}"}) continue if not verify_op(op): result.rejected.append( {"op": op.op_id, "reason": "signature or identity verification failed"} ) continue parsed.append(op) pending = sorted(parsed, key=lambda o: (o.lamport, o.op_id)) progress = True while pending and progress: progress = False remaining: List[Operation] = [] for op in pending: if store.has(op.op_id): progress = True # duplicate: drop silently, may unblock others continue if any(not store.has(p) for p in op.prev): remaining.append(op) continue store.append(op, verify=False) # verified above result.accepted.append(op.op_id) if observe is not None: observe(op) progress = True pending = remaining result.deferred = [op.op_id for op in pending] return result class SyncServer: """Serves protocol messages for a node. Stateless between requests.""" def __init__(self, node) -> None: self.node = node def handle(self, message: dict) -> dict: try: return self._dispatch(message) except Exception as exc: # never let a bad message take the server down return {"type": "error", "error": f"{type(exc).__name__}: {exc}"} def _dispatch(self, message: dict) -> dict: if not isinstance(message, dict): return {"type": "error", "error": "message must be a JSON object"} mtype = message.get("type") if mtype == "hello": return { "type": "hello", "protocol": PROTOCOL, "node": self.node.key_id, "name": self.node.name, } if mtype == "frontier": return { "type": "frontier", "protocol": PROTOCOL, "op_ids": sorted(self.node.store.op_ids()), "lamport": self.node.store.max_lamport(), } if mtype == "ops_request": want = list(message.get("want", []))[:MAX_WANT] ops = [ self.node.store.get(op_id).to_dict() for op_id in want if self.node.store.has(op_id) ] return {"type": "ops", "ops": ops} if mtype == "push": result = ingest_ops(self.node.store, message.get("ops", [])) return { "type": "push_result", "accepted": result.accepted, "rejected": result.rejected, "deferred": result.deferred, } if mtype == "delegate_pull": return capability.serve_delegate_pull( self.node, message.get("grant_id"), message.get("grantee") ) if mtype == "purge_receipt": return capability.accept_purge_receipt(self.node, message.get("receipt")) if mtype == "holdings_request": return capability.serve_holdings(self.node, message.get("grant_id")) return {"type": "error", "error": f"unknown message type: {mtype!r}"} @dataclass class SyncReport: peer: str pulled: int pushed: int rejected: List[dict] deferred: List[str] conflicts: List[dict] def summary(self) -> str: line = ( f"sync with {self.peer or ''}: " f"pulled {self.pulled}, pushed {self.pushed}" ) if self.rejected: line += f", rejected {len(self.rejected)}" if self.deferred: line += f", deferred {len(self.deferred)}" if self.conflicts: line += f", resolved {len(self.conflicts)} claim conflict(s)" return line class SyncClient: """Drives a full bidirectional sync against one peer.""" def __init__(self, node, transport: Transport) -> None: self.node = node self.transport = transport # -- protocol steps ------------------------------------------------- def hello(self) -> dict: reply = self.transport.request({"type": "hello"}) if reply.get("type") != "hello": raise SyncError(f"peer did not say hello: {reply.get('error', reply)}") if reply.get("protocol") != PROTOCOL: raise SyncError( f"protocol mismatch: peer speaks {reply.get('protocol')!r}, " f"we speak {PROTOCOL!r}" ) return reply def _remote_frontier(self) -> Set[str]: reply = self.transport.request({"type": "frontier"}) if reply.get("type") != "frontier": raise SyncError(f"bad frontier reply: {reply.get('error', reply)}") return set(reply.get("op_ids", [])) def pull(self) -> IngestResult: remote = self._remote_frontier() local = self.node.store.op_ids() want = sorted(remote - local) total = IngestResult() for start in range(0, len(want), BATCH_SIZE): batch = want[start : start + BATCH_SIZE] reply = self.transport.request({"type": "ops_request", "want": batch}) if reply.get("type") != "ops": raise SyncError(f"bad ops reply: {reply.get('error', reply)}") total.extend(ingest_ops(self.node.store, reply.get("ops", []))) return total def push(self) -> IngestResult: remote = self._remote_frontier() to_send = self.node.store.ops_since(remote) total = IngestResult() for start in range(0, len(to_send), BATCH_SIZE): batch = [op.to_dict() for op in to_send[start : start + BATCH_SIZE]] reply = self.transport.request({"type": "push", "ops": batch}) if reply.get("type") != "push_result": raise SyncError(f"bad push reply: {reply.get('error', reply)}") total.accepted.extend(reply.get("accepted", [])) total.rejected.extend(reply.get("rejected", [])) total.deferred.extend(reply.get("deferred", [])) return total # -- the whole dance ------------------------------------------------- def sync(self) -> SyncReport: hello = self.hello() pulled = self.pull() pushed = self.push() conflicts = [c.to_dict() for c in self.node.graph.conflicts] return SyncReport( peer=hello.get("node", ""), pulled=len(pulled.accepted), pushed=len(pushed.accepted), rejected=pulled.rejected + pushed.rejected, deferred=pulled.deferred + pushed.deferred, conflicts=conflicts, )