"""Calendar adapter: iCalendar (RFC 5545) ``.ics`` files -> ``calendar.event`` evidence. A deliberately self-contained ICS parser (no third-party dependency) covering the subset real calendar exports use: * line unfolding (CRLF + leading space/tab continuations); * content lines ``NAME;PARAM=VAL;PARAM="quoted,val":value`` with quoted parameter values; * TEXT escaping (``\\n``, ``\\,``, ``\\;``, ``\\\\``); * nested components (VCALENDAR / VEVENT / VALARM), of which VEVENTs are extracted and VALARMs are noted but not expanded; * DATE and DATE-TIME values, including UTC (``Z``) and ``TZID=`` forms. Timezone policy: this adapter records timestamps faithfully -- the raw value, an ISO-8601 rendering, and the TZID parameter if present -- but does **not** resolve TZIDs to offsets (that would require a tz database and VTIMEZONE expansion, which belongs in the derivation engine where the interpretation can itself carry provenance and be corrected). Recurrence policy: RRULE/EXDATE are recorded verbatim as evidence; expansion into concrete occurrences is derivation, not evidence. ``external_id`` is the event UID (plus RECURRENCE-ID for overridden instances), so an updated event supersedes its previous evidence op. """ from __future__ import annotations import re from pathlib import Path from typing import Iterator, Optional from pmp.adapters.base import Adapter, EvidenceItem, prune, register from pmp.errors import AdapterError _DT_RE = re.compile(r"^(\d{4})(\d{2})(\d{2})(?:T(\d{2})(\d{2})(\d{2})(Z)?)?$") # --------------------------------------------------------------------------- # low-level ICS reading # --------------------------------------------------------------------------- def unfold_lines(text: str) -> list[str]: """Undo RFC 5545 line folding.""" physical = text.replace("\r\n", "\n").replace("\r", "\n").split("\n") logical: list[str] = [] for line in physical: if not line: continue if line[0] in (" ", "\t") and logical: logical[-1] += line[1:] else: logical.append(line) return logical def _split_outside_quotes(text: str, sep: str) -> list[str]: parts: list[str] = [] buf: list[str] = [] in_quotes = False for ch in text: if ch == '"': in_quotes = not in_quotes buf.append(ch) elif ch == sep and not in_quotes: parts.append("".join(buf)) buf = [] else: buf.append(ch) parts.append("".join(buf)) return parts def parse_content_line(line: str) -> tuple[str, dict[str, list[str]], str]: """Split one logical line into (NAME, {PARAM: [values]}, value).""" in_quotes = False colon_at = -1 for i, ch in enumerate(line): if ch == '"': in_quotes = not in_quotes elif ch == ":" and not in_quotes: colon_at = i break if colon_at < 0: raise AdapterError(f"malformed ICS content line (no ':'): {line[:80]!r}") left, value = line[:colon_at], line[colon_at + 1:] pieces = _split_outside_quotes(left, ";") name = pieces[0].strip().upper() if not name: raise AdapterError(f"malformed ICS content line (empty name): {line[:80]!r}") params: dict[str, list[str]] = {} for piece in pieces[1:]: if "=" not in piece: continue # tolerate degenerate parameters from sloppy exporters pname, pval = piece.split("=", 1) values = [v.strip().strip('"') for v in _split_outside_quotes(pval, ",")] params[pname.strip().upper()] = values return name, params, value def unescape_text(value: str) -> str: """Undo TEXT escaping (\\n, \\N, \\, \\; \\\\).""" out: list[str] = [] i = 0 while i < len(value): ch = value[i] if ch == "\\" and i + 1 < len(value): nxt = value[i + 1] if nxt in ("n", "N"): out.append("\n") elif nxt in (",", ";", "\\"): out.append(nxt) else: out.append(nxt) # unknown escape: keep the character i += 2 else: out.append(ch) i += 1 return "".join(out) def parse_components(lines: list[str]) -> dict: """Parse logical lines into a component tree rooted at a pseudo-ROOT.""" root = {"name": "ROOT", "props": [], "components": []} stack = [root] for line in lines: name, params, value = parse_content_line(line) if name == "BEGIN": comp = {"name": value.strip().upper(), "props": [], "components": []} stack[-1]["components"].append(comp) stack.append(comp) elif name == "END": wanted = value.strip().upper() if len(stack) < 2 or stack[-1]["name"] != wanted: raise AdapterError( f"mismatched END:{wanted} (open component: {stack[-1]['name']})" ) stack.pop() else: stack[-1]["props"].append((name, params, value)) if len(stack) != 1: raise AdapterError(f"unclosed ICS component: {stack[-1]['name']}") return root # --------------------------------------------------------------------------- # value normalization # --------------------------------------------------------------------------- def parse_ics_datetime(value: str, params: dict[str, list[str]]) -> dict: """Normalize a DATE / DATE-TIME value without resolving timezones.""" value = value.strip() tzid = params.get("TZID", [None])[0] m = _DT_RE.match(value) if not m: return prune({"raw": value, "tzid": tzid}) y, mo, d, h, mi, s, zulu = m.groups() if h is None: return prune( {"raw": value, "iso": f"{y}-{mo}-{d}", "all_day": True, "tzid": tzid} ) iso = f"{y}-{mo}-{d}T{h}:{mi}:{s}" + ("Z" if zulu else "") return prune({"raw": value, "iso": iso, "all_day": False, "tzid": tzid}) def _first(props, name) -> Optional[tuple[dict, str]]: for pname, params, value in props: if pname == name: return params, value return None def _all(props, name) -> list[tuple[dict, str]]: return [(params, value) for pname, params, value in props if pname == name] def _person(params: dict, value: str) -> dict: return prune( { "uri": value.strip(), "cn": params.get("CN", [None])[0], "role": params.get("ROLE", [None])[0], "partstat": params.get("PARTSTAT", [None])[0], } ) def _event_content(event: dict, calendar_meta: dict) -> dict: props = event["props"] def text(name): hit = _first(props, name) return unescape_text(hit[1]) if hit else None def dt(name): hit = _first(props, name) return parse_ics_datetime(hit[1], hit[0]) if hit else None sequence = None hit = _first(props, "SEQUENCE") if hit: try: sequence = int(hit[1].strip()) except ValueError: sequence = None categories: list[str] = [] for params, value in _all(props, "CATEGORIES"): categories.extend( unescape_text(part).strip() for part in _split_outside_quotes(value, ",") if part.strip() ) organizer = None hit = _first(props, "ORGANIZER") if hit: organizer = _person(hit[0], hit[1]) attendees = [_person(params, value) for params, value in _all(props, "ATTENDEE")] rrule = _first(props, "RRULE") exdates = [value for _, value in _all(props, "EXDATE")] recurrence_id = dt("RECURRENCE-ID") return prune( { "uid": text("UID"), "summary": text("SUMMARY"), "description": text("DESCRIPTION"), "location": text("LOCATION"), "status": text("STATUS"), "transparency": text("TRANSP"), "url": text("URL"), "start": dt("DTSTART"), "end": dt("DTEND"), "duration": text("DURATION"), "rrule": rrule[1] if rrule else None, "exdates": exdates, "recurrence_id": recurrence_id, "organizer": organizer, "attendees": attendees, "categories": categories, "sequence": sequence, "created": dt("CREATED"), "last_modified": dt("LAST-MODIFIED"), "dtstamp": dt("DTSTAMP"), "has_alarm": any(c["name"] == "VALARM" for c in event["components"]), "calendar": calendar_meta, } ) # --------------------------------------------------------------------------- # adapter # --------------------------------------------------------------------------- @register class ICSCalendarAdapter(Adapter): """Imports iCalendar (.ics) files as ``calendar.event`` evidence.""" name = "ics" version = "1.0.0" content_types = ("calendar.event",) def parse(self, source: Path) -> Iterator[EvidenceItem]: for path in self.iter_files(source, (".ics",)): try: text = path.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError) as exc: raise AdapterError(f"cannot read {path}: {exc}") from exc try: root = parse_components(unfold_lines(text)) except AdapterError as exc: raise AdapterError(f"{path}: {exc}") from exc for vcal in (c for c in root["components"] if c["name"] == "VCALENDAR"): cal_name = _first(vcal["props"], "X-WR-CALNAME") prodid = _first(vcal["props"], "PRODID") calendar_meta = prune( { "name": unescape_text(cal_name[1]) if cal_name else None, "prodid": prodid[1] if prodid else None, "file": path.name, } ) for event in (c for c in vcal["components"] if c["name"] == "VEVENT"): content = _event_content(event, calendar_meta) uid = content.get("uid") if not uid: raise AdapterError(f"{path}: VEVENT without UID") external_id = uid rec = content.get("recurrence_id") if rec and rec.get("raw"): external_id = f"{uid}:{rec['raw']}" observed = ( (content.get("last_modified") or {}).get("iso") or (content.get("dtstamp") or {}).get("iso") ) yield EvidenceItem( content_type="calendar.event", content=content, external_id=external_id, observed_at=observed, origin=path.resolve().as_uri(), )