"""Calendar import adapter: iCalendar (.ics) files to evidence records. A deliberately small, dependency-free RFC 5545 subset parser sufficient for real-world calendar exports: * line unfolding (CRLF + leading space/tab continuations), * property parameters with quoted values (``DTSTART;TZID=Europe/Berlin:...``), * text escaping (``\\,`` ``\\;`` ``\\n`` ``\\\\``), * DATE and DATE-TIME values (with and without UTC ``Z``), * RRULE, ATTENDEE/ORGANIZER (mailto + CN), CATEGORIES, STATUS. Anything outside this subset is ignored rather than rejected: an adapter's job is to extract evidence, not to validate someone else's exporter. """ from __future__ import annotations import hashlib import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from . import EvidenceRecord __all__ = ["load", "parse_events"] _DT_RE = re.compile(r"^(\d{4})(\d{2})(\d{2})(?:T(\d{2})(\d{2})(\d{2})(Z?))?$") def _unfold(text: str) -> List[str]: """Undo RFC 5545 line folding and normalise line endings.""" text = text.replace("\r\n", "\n").replace("\r", "\n") out: List[str] = [] for line in text.split("\n"): if line[:1] in (" ", "\t") and out: out[-1] += line[1:] elif line.strip(): out.append(line) return out def _split_name_params(head: str) -> Tuple[str, Dict[str, str]]: """Split ``NAME;P1=V1;P2="a;b"`` into name and a params dict.""" parts: List[str] = [] buf = "" in_quotes = False for ch in head: if ch == '"': in_quotes = not in_quotes buf += ch elif ch == ";" and not in_quotes: parts.append(buf) buf = "" else: buf += ch parts.append(buf) name = parts[0].strip().upper() params: Dict[str, str] = {} for part in parts[1:]: if "=" in part: key, value = part.split("=", 1) params[key.strip().upper()] = value.strip().strip('"') return name, params def _parse_content_line(line: str) -> Optional[Tuple[str, Dict[str, str], str]]: """Split a content line at the first ``:`` outside quotes.""" in_quotes = False for i, ch in enumerate(line): if ch == '"': in_quotes = not in_quotes elif ch == ":" and not in_quotes: name, params = _split_name_params(line[:i]) return name, params, line[i + 1 :] return None _ESCAPES = {"\\\\": "\\", "\\;": ";", "\\,": ",", "\\n": "\n", "\\N": "\n"} def _unescape(value: str) -> str: out: List[str] = [] i = 0 while i < len(value): if value[i] == "\\" and i + 1 < len(value): pair = value[i : i + 2] out.append(_ESCAPES.get(pair, value[i + 1])) i += 2 else: out.append(value[i]) i += 1 return "".join(out) def _parse_dt(value: str, params: Dict[str, str]) -> Dict[str, Any]: """Parse a DATE or DATE-TIME property value into a JSON-safe dict.""" match = _DT_RE.match(value.strip()) if not match: return {"raw": value.strip()} year, month, day, hh, mm, ss, zulu = match.groups() if hh is None or params.get("VALUE", "").upper() == "DATE": return {"date": f"{year}-{month}-{day}", "all_day": True} iso = f"{year}-{month}-{day}T{hh}:{mm}:{ss}" if zulu: iso += "Z" result: Dict[str, Any] = {"datetime": iso, "all_day": False} tzid = params.get("TZID") if tzid: result["tzid"] = tzid return result def _parse_rrule(value: str) -> Dict[str, str]: rule: Dict[str, str] = {} for piece in value.split(";"): if "=" in piece: key, val = piece.split("=", 1) rule[key.strip().upper()] = val.strip() return rule def _parse_person(value: str, params: Dict[str, str]) -> Dict[str, str]: person: Dict[str, str] = {} value = value.strip() if value.lower().startswith("mailto:"): person["email"] = value[7:].strip().lower() elif value: person["uri"] = value name = params.get("CN") if name: person["name"] = name return person def parse_events(text: str) -> List[Dict[str, Any]]: """Parse all VEVENT components from an iCalendar text blob.""" events: List[Dict[str, Any]] = [] current: Optional[Dict[str, Any]] = None for line in _unfold(text): parsed = _parse_content_line(line) if parsed is None: continue name, params, value = parsed if name == "BEGIN" and value.strip().upper() == "VEVENT": current = {"attendees": [], "categories": []} elif name == "END" and value.strip().upper() == "VEVENT": if current is not None: events.append(current) current = None elif current is None: continue elif name == "UID": current["uid"] = value.strip() elif name == "SUMMARY": current["summary"] = _unescape(value).strip() elif name == "DESCRIPTION": current["description"] = _unescape(value).strip() elif name == "LOCATION": current["location"] = _unescape(value).strip() elif name == "STATUS": current["status"] = value.strip().upper() elif name == "DTSTART": current["start"] = _parse_dt(value, params) elif name == "DTEND": current["end"] = _parse_dt(value, params) elif name == "DTSTAMP": current["stamp"] = _parse_dt(value, params) elif name == "RRULE": current["rrule"] = _parse_rrule(value) elif name == "CATEGORIES": current["categories"].extend( cat.strip() for cat in _unescape(value).split(",") if cat.strip() ) elif name == "ATTENDEE": person = _parse_person(value, params) if person: current["attendees"].append(person) elif name == "ORGANIZER": person = _parse_person(value, params) if person: current["organizer"] = person return events def _event_payload(event: Dict[str, Any]) -> Dict[str, Any]: payload = { key: value for key, value in event.items() if key != "stamp" and value not in (None, "", [], {}) } return payload def _external_id(event: Dict[str, Any]) -> str: uid = event.get("uid") if uid: return str(uid) start = event.get("start") or {} seed = f"{event.get('summary', '')}|{start.get('datetime') or start.get('date') or ''}" return "synth-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:16] def _observed_at(event: Dict[str, Any]) -> Optional[str]: for source in (event.get("stamp"), event.get("start")): if isinstance(source, dict): value = source.get("datetime") or source.get("date") if value: return value return None def load(path: str) -> List[EvidenceRecord]: """Load one ``.ics`` file, or every ``.ics`` file under a directory.""" root = Path(path) if root.is_dir(): files = sorted(p for p in root.rglob("*.ics") if p.is_file()) elif root.is_file(): files = [root] else: raise FileNotFoundError(f"calendar source not found: {path}") records: List[EvidenceRecord] = [] for file in files: text = file.read_text(encoding="utf-8", errors="replace") for event in parse_events(text): records.append( EvidenceRecord( source=f"calendar:{file.name}", kind="calendar.event", external_id=_external_id(event), payload=_event_payload(event), observed_at=_observed_at(event), ) ) return records