"""Fetch the Home Assistant integration install-base from the public analytics endpoint and write it as a ranked CSV. Data source ----------- ``https://analytics.home-assistant.io/data.json`` -- the same JSON the official analytics dashboard renders. It is a mapping of millisecond timestamps to snapshot objects; recent snapshots carry an ``integrations`` mapping of ``domain -> opted-in installation count`` plus an ``active_installations`` total. The schema is *not* a stable contract (see report section 01). If the structure changes this script fails loudly with a description of what it found, rather than writing a silently-wrong table. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from .common import FileCache, cached_get, make_session, write_csv ANALYTICS_URL = "https://analytics.home-assistant.io/data.json" def _latest_snapshot_with_integrations(payload: object) -> tuple[str, dict]: """Return (timestamp_key, snapshot) for the newest snapshot that has an ``integrations`` mapping. Raises ValueError on unrecognised shapes.""" if not isinstance(payload, dict) or not payload: raise ValueError( "analytics payload is not a non-empty JSON object; " f"got {type(payload).__name__}" ) try: keys = sorted(payload.keys(), key=lambda k: int(k), reverse=True) except (TypeError, ValueError) as exc: raise ValueError( "analytics payload keys are not integer timestamps; " f"sample keys: {list(payload.keys())[:3]}" ) from exc for key in keys: snapshot = payload[key] if isinstance(snapshot, dict) and isinstance( snapshot.get("integrations"), dict ) and snapshot["integrations"]: return key, snapshot raise ValueError( "no snapshot in the analytics payload contained a non-empty " "'integrations' mapping; the upstream schema may have changed " "(inspect the cached body under tools/.cache)" ) def fetch_analytics(cache_dir: str, ttl: float) -> tuple[str, dict]: session = make_session() cache = FileCache(cache_dir, ttl_seconds=ttl) status, text = cached_get(session, cache, ANALYTICS_URL) if status != 200: raise RuntimeError(f"GET {ANALYTICS_URL} returned HTTP {status}") return _latest_snapshot_with_integrations(json.loads(text)) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Fetch Home Assistant integration install counts." ) parser.add_argument("--out", default="data/generated/analytics_integrations.csv", help="output CSV path (default: %(default)s)") parser.add_argument("--top", type=int, default=0, help="keep only the top N integrations (0 = all)") parser.add_argument("--min-installs", type=int, default=0, help="drop integrations below this install count") parser.add_argument("--cache-dir", default=str(Path("tools") / ".cache")) parser.add_argument("--cache-ttl", type=float, default=24 * 3600, help="cache TTL in seconds (default: 1 day)") args = parser.parse_args(argv) try: ts_key, snapshot = fetch_analytics(args.cache_dir, args.cache_ttl) except (RuntimeError, ValueError, json.JSONDecodeError) as exc: print(f"error: {exc}", file=sys.stderr) return 1 integrations: dict[str, int] = { str(domain): int(count) for domain, count in snapshot["integrations"].items() } active = snapshot.get("reports_integrations") if not isinstance(active, int): active = snapshot.get("active_installations") total_reporting = active if isinstance(active, int) and active > 0 else None ranked = sorted(integrations.items(), key=lambda kv: (-kv[1], kv[0])) if args.min_installs: ranked = [kv for kv in ranked if kv[1] >= args.min_installs] if args.top: ranked = ranked[: args.top] rows = [] for rank, (domain, count) in enumerate(ranked, start=1): pct = (f"{100.0 * count / total_reporting:.2f}" if total_reporting else "") rows.append({ "rank": rank, "domain": domain, "installations": count, "pct_of_reporting_installs": pct, "snapshot_timestamp_ms": ts_key, }) written = write_csv( args.out, ["rank", "domain", "installations", "pct_of_reporting_installs", "snapshot_timestamp_ms"], rows, ) denom = total_reporting if total_reporting else "unknown" print(f"wrote {written} integrations to {args.out} " f"(snapshot {ts_key}, reporting installs: {denom})") return 0 if __name__ == "__main__": raise SystemExit(main())