"""Per-integration open-issue triage against home-assistant/core. For each integration domain we query the GitHub search API for open issues carrying the ``integration: `` label (the label convention used by the core repo's issue bot), record the total open count, and classify a sample of the highest-reaction issues into coarse triage categories with keyword heuristics (the same heuristics described in report section 04). Rate limits: the search API allows 30 requests/min with a token and 10/min anonymously. We self-throttle accordingly (one search request per domain), and ``common.request_with_backoff`` additionally honours Retry-After / X-RateLimit-Reset headers if we still get throttled. A full 150-domain pass takes ~6 minutes with a token, ~17 minutes without. Search responses are not disk-cached (they carry query params and go stale fast); re-running re-queries. """ from __future__ import annotations import argparse import json import sys import time from collections import Counter from pathlib import Path from .common import ( github_headers, github_token, make_session, read_domains_csv, request_with_backoff, write_csv, ) SEARCH_URL = "https://api.github.com/search/issues" # Ordered: first matching category wins. Mirrors report section 04. CATEGORY_KEYWORDS: list[tuple[str, tuple[str, ...]]] = [ ("auth_or_setup", ( "auth", "login", "token", "credential", "reauth", "config flow", "failed to set up", "setup failed", "cannot configure", )), ("connectivity", ( "timeout", "timed out", "unavailable", "disconnect", "connection", "offline", "unreachable", "lost contact", )), ("regression", ( "regression", "after updat", "after upgrad", "since 20", "worked before", "stopped working", "broke after", )), ("device_support", ( "not supported", "unsupported", "add support for", "new device", "new model", "missing entity", "missing sensor", )), ("performance", ( "slow", "lag", "cpu", "memory leak", "high memory", "performance", )), ("crash_or_error", ( "traceback", "exception", "error", "crash", "unhandled", )), ] def classify(title: str, body: str | None) -> str: haystack = f"{title}\n{body or ''}".lower() for category, keywords in CATEGORY_KEYWORDS: if any(keyword in haystack for keyword in keywords): return category return "other" def search_domain(session, headers, domain: str, sample: int) -> dict: query = (f'repo:home-assistant/core is:issue is:open ' f'label:"integration: {domain}"') params = { "q": query, "per_page": str(max(1, min(sample, 100))), "sort": "reactions", "order": "desc", } resp = request_with_backoff(session, SEARCH_URL, params=params, headers=headers) if resp.status_code != 200: return {"error": f"http_{resp.status_code}"} try: return resp.json() except json.JSONDecodeError: return {"error": "bad_json"} def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Triage open core issues per integration label." ) parser.add_argument("--domains-csv", required=True, help="CSV with a 'domain' column (e.g. analytics output)") parser.add_argument("--limit", type=int, default=150, help="number of domains to triage (default: %(default)s)") parser.add_argument("--sample", type=int, default=20, help="issues sampled per domain for classification " "(default: %(default)s, max 100)") parser.add_argument("--out", default="data/generated/issue_triage_raw.csv") parser.add_argument("--checkpoint-every", type=int, default=25, help="write partial CSV every N domains (crash safety)") args = parser.parse_args(argv) try: domains = read_domains_csv(args.domains_csv, limit=args.limit) except (ValueError, OSError) as exc: print(f"error: {exc}", file=sys.stderr) return 1 session = make_session() headers = github_headers() has_token = github_token() is not None delay = 2.2 if has_token else 6.5 print(f"triaging {len(domains)} domains " f"({'authenticated' if has_token else 'anonymous; set GITHUB_TOKEN to go 3x faster'}); " f"~{len(domains) * delay / 60:.0f} min") category_names = [name for name, _ in CATEGORY_KEYWORDS] + ["other"] fields = (["domain", "open_issue_count", "sampled"] + [f"cat_{name}" for name in category_names] + ["top_issue_title", "top_issue_url", "top_issue_reactions", "fetch_error"]) rows: list[dict[str, object]] = [] for index, domain in enumerate(domains, start=1): payload = search_domain(session, headers, domain, args.sample) row: dict[str, object] = {"domain": domain, "fetch_error": ""} if "error" in payload: row.update({"open_issue_count": "", "sampled": 0, "top_issue_title": "", "top_issue_url": "", "top_issue_reactions": "", "fetch_error": payload["error"]}) for name in category_names: row[f"cat_{name}"] = "" else: items = payload.get("items", []) or [] counts = Counter( classify(item.get("title", ""), item.get("body")) for item in items ) row["open_issue_count"] = int(payload.get("total_count", 0)) row["sampled"] = len(items) for name in category_names: row[f"cat_{name}"] = counts.get(name, 0) if items: top = items[0] reactions = top.get("reactions") or {} row["top_issue_title"] = (top.get("title") or "")[:160] row["top_issue_url"] = top.get("html_url", "") row["top_issue_reactions"] = reactions.get("total_count", "") else: row["top_issue_title"] = "" row["top_issue_url"] = "" row["top_issue_reactions"] = "" rows.append(row) print(f" [{index}/{len(domains)}] {domain}: " f"{row.get('open_issue_count', '?')} open" + (f" (error: {row['fetch_error']})" if row["fetch_error"] else "")) if args.checkpoint_every and index % args.checkpoint_every == 0: write_csv(args.out, fields, rows) if index < len(domains): time.sleep(delay) written = write_csv(args.out, fields, rows) errors = sum(1 for row in rows if row["fetch_error"]) print(f"wrote {written} rows to {args.out}" + (f" ({errors} domains had fetch errors; re-run to retry)" if errors else "")) return 0 if __name__ == "__main__": raise SystemExit(main())