"""Append-only operation log tests: ordering, persistence, tamper detection. The log file is plain UTF-8 line-delimited JSON, so tamper scenarios are exercised by editing the file directly — exactly what a hostile or buggy process on the same machine could do — and asserting that verification refuses the result. """ from __future__ import annotations import re from pathlib import Path from helpers import signalled_failure from pmp.keys import KeyStore from pmp.operations import build_operation from pmp.oplog import OpLog def _make_ops(tmp_path: Path, n: int = 3): ks = KeyStore(tmp_path / "keys") key = ks.create("device") ops = [] prev = None for i in range(n): op = build_operation( op_type="evidence.observe", body={ "source_type": "test", "content": {"note": f"entry {i} hello world"}, "provenance": {"adapter": "test", "source_ref": f"memory://item/{i}"}, }, key=key, prev=prev, seq=i, ) ops.append(op) prev = op.op_id return ops def test_empty_log_iterates_empty_and_verifies(tmp_path: Path): log = OpLog(tmp_path / "oplog.jsonl") assert list(log) == [] assert not signalled_failure(log.verify) def test_append_preserves_order(tmp_path: Path): ops = _make_ops(tmp_path, 4) log = OpLog(tmp_path / "oplog.jsonl") for op in ops: log.append(op) stored = list(log) assert [op.op_id for op in stored] == [op.op_id for op in ops] def test_log_persists_across_reopen(tmp_path: Path): path = tmp_path / "oplog.jsonl" ops = _make_ops(tmp_path, 3) log = OpLog(path) for op in ops: log.append(op) reopened = OpLog(path) stored = list(reopened) assert [op.op_id for op in stored] == [op.op_id for op in ops] assert not signalled_failure(reopened.verify) def test_verify_passes_on_intact_log(tmp_path: Path): path = tmp_path / "oplog.jsonl" log = OpLog(path) for op in _make_ops(tmp_path, 3): log.append(op) assert not signalled_failure(log.verify) def _written_log(tmp_path: Path, n: int = 3) -> Path: path = tmp_path / "oplog.jsonl" log = OpLog(path) for op in _make_ops(tmp_path, n): log.append(op) return path def test_tampered_value_is_detected(tmp_path: Path): path = _written_log(tmp_path) text = path.read_text(encoding="utf-8") assert "entry 1" in text # Edit a payload value inside the middle record. JSON stays valid; the # content hash / signature must no longer match. path.write_text(text.replace("entry 1", "entry X", 1), encoding="utf-8") assert signalled_failure(lambda: OpLog(path).verify()) def test_deleted_record_is_detected(tmp_path: Path): path = _written_log(tmp_path) lines = path.read_text(encoding="utf-8").splitlines() assert len(lines) >= 3 del lines[1] # drop a middle record, breaking the prev-hash chain path.write_text("\n".join(lines) + "\n", encoding="utf-8") assert signalled_failure(lambda: OpLog(path).verify()) def test_reordered_records_are_detected(tmp_path: Path): path = _written_log(tmp_path) lines = path.read_text(encoding="utf-8").splitlines() assert len(lines) >= 3 lines[0], lines[1] = lines[1], lines[0] path.write_text("\n".join(lines) + "\n", encoding="utf-8") assert signalled_failure(lambda: OpLog(path).verify()) def test_truncated_tail_record_is_detected_or_tolerated_explicitly(tmp_path: Path): """A half-written final line (crash during append) must never verify as a valid record. Implementations may either fail verification or expose a recovery path, but the mangled record must not silently appear in the iterated operations.""" path = _written_log(tmp_path, 3) text = path.read_text(encoding="utf-8").rstrip("\n") path.write_text(text[: len(text) - 20], encoding="utf-8") # chop the tail failed = signalled_failure(lambda: OpLog(path).verify()) if not failed: # If verification tolerated the truncation, the broken record must # have been excluded rather than parsed into garbage. try: ops = list(OpLog(path)) except Exception: return # raising on iteration is also acceptable assert len(ops) < 3 def test_forged_record_with_unknown_key_is_detected(tmp_path: Path): path = _written_log(tmp_path) text = path.read_text(encoding="utf-8") # Corrupt the signature field of the first record (any base64/hex sig # string will contain at least one of these characters to swap). tampered = re.sub(r'("sig"\s*:\s*")([^"]{8})', lambda m: m.group(1) + m.group(2)[::-1], text, count=1) if tampered == text: # Field named differently — fall back to value tampering, which the # dedicated test above already covers; treat as covered. return path.write_text(tampered, encoding="utf-8") assert signalled_failure(lambda: OpLog(path).verify())