"""Join the three raw pulls (analytics, quality scale, issue triage) into the data tables used by the milestone-1 report, plus rendered Markdown. Inputs (all produced by the sibling fetch scripts): * analytics CSV -- rank, domain, installations, ... * quality CSV -- domain, quality_scale, ... * issues CSV -- domain, open_issue_count, cat_* columns, ... Outputs (default ``data/generated/``; the committed files under ``data/`` are the frozen snapshot the report text cites and are not overwritten): * top_integrations_generated.csv * quality_scale_distribution_generated.csv * issue_triage_generated.csv * tables.md -- the same three tables rendered as Markdown Pure standard library; no network access. """ from __future__ import annotations import argparse import sys from collections import Counter from pathlib import Path from .common import read_csv, write_csv CATEGORY_COLUMNS = [ "cat_auth_or_setup", "cat_connectivity", "cat_regression", "cat_device_support", "cat_performance", "cat_crash_or_error", "cat_other", ] def _to_int(value: object, default: int = 0) -> int: try: return int(str(value)) except (TypeError, ValueError): return default def _index_by_domain(rows: list[dict[str, str]]) -> dict[str, dict[str, str]]: return {row["domain"]: row for row in rows if row.get("domain")} def md_table(headers: list[str], rows: list[list[object]]) -> str: lines = ["| " + " | ".join(headers) + " |", "| " + " | ".join("---" for _ in headers) + " |"] for row in rows: lines.append("| " + " | ".join(str(cell) for cell in row) + " |") return "\n".join(lines) + "\n" def dominant_category(issue_row: dict[str, str]) -> str: counts = {col: _to_int(issue_row.get(col)) for col in CATEGORY_COLUMNS} if not any(counts.values()): return "" best = max(counts.items(), key=lambda kv: kv[1]) return best[0].removeprefix("cat_") def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Build the report's joined data tables from raw pulls." ) parser.add_argument("--analytics", default="data/generated/analytics_integrations.csv") parser.add_argument("--quality", default="data/generated/quality_scale_raw.csv") parser.add_argument("--issues", default="data/generated/issue_triage_raw.csv") parser.add_argument("--top", type=int, default=150, help="size of the head table (default: %(default)s)") parser.add_argument("--outdir", default="data/generated") args = parser.parse_args(argv) try: analytics = read_csv(args.analytics) quality = _index_by_domain(read_csv(args.quality)) issues = _index_by_domain(read_csv(args.issues)) except OSError as exc: print(f"error reading inputs: {exc}", file=sys.stderr) return 1 if not analytics: print("error: analytics CSV is empty", file=sys.stderr) return 1 outdir = Path(args.outdir) outdir.mkdir(parents=True, exist_ok=True) # ----- 1. top-N joined table ------------------------------------------ top_rows: list[dict[str, object]] = [] for row in analytics[: args.top]: domain = row["domain"] installs = _to_int(row.get("installations")) q_row = quality.get(domain, {}) i_row = issues.get(domain, {}) open_issues = _to_int(i_row.get("open_issue_count"), default=-1) per_100k = (f"{open_issues / installs * 100_000:.1f}" if installs > 0 and open_issues >= 0 else "") top_rows.append({ "rank": _to_int(row.get("rank")), "domain": domain, "installations": installs, "quality_scale": q_row.get("quality_scale", ""), "integration_type": q_row.get("integration_type", ""), "iot_class": q_row.get("iot_class", ""), "open_issues": open_issues if open_issues >= 0 else "", "issues_per_100k_installs": per_100k, }) top_fields = ["rank", "domain", "installations", "quality_scale", "integration_type", "iot_class", "open_issues", "issues_per_100k_installs"] write_csv(outdir / "top_integrations_generated.csv", top_fields, top_rows) # ----- 2. quality-scale distribution (within the top-N) --------------- tier_counts = Counter( str(row["quality_scale"]) or "unscored" for row in top_rows ) total = sum(tier_counts.values()) or 1 tier_order = ["platinum", "gold", "silver", "bronze", "legacy", "internal", "unscored", "missing"] ordered_tiers = ([t for t in tier_order if t in tier_counts] + sorted(set(tier_counts) - set(tier_order))) dist_rows = [{ "tier": tier, "count": tier_counts[tier], "share_pct": f"{100.0 * tier_counts[tier] / total:.1f}", } for tier in ordered_tiers] write_csv(outdir / "quality_scale_distribution_generated.csv", ["tier", "count", "share_pct"], dist_rows) # ----- 3. issue triage table ------------------------------------------- triage_rows: list[dict[str, object]] = [] for row in top_rows: domain = str(row["domain"]) i_row = issues.get(domain) if not i_row: continue entry: dict[str, object] = { "domain": domain, "installations": row["installations"], "quality_scale": row["quality_scale"], "open_issues": row["open_issues"], "issues_per_100k_installs": row["issues_per_100k_installs"], "dominant_category": dominant_category(i_row), "top_issue_title": i_row.get("top_issue_title", ""), "top_issue_url": i_row.get("top_issue_url", ""), } for col in CATEGORY_COLUMNS: entry[col] = _to_int(i_row.get(col)) triage_rows.append(entry) triage_rows.sort( key=lambda r: -(r["open_issues"] if isinstance(r["open_issues"], int) else -1) ) triage_fields = (["domain", "installations", "quality_scale", "open_issues", "issues_per_100k_installs", "dominant_category"] + CATEGORY_COLUMNS + ["top_issue_title", "top_issue_url"]) write_csv(outdir / "issue_triage_generated.csv", triage_fields, triage_rows) # ----- 4. Markdown rendering ------------------------------------------- md_parts = ["# Generated report tables\n", f"\nBuilt from: `{args.analytics}`, `{args.quality}`, " f"`{args.issues}`.\n", f"\n## Top {len(top_rows)} integrations by install base\n\n"] md_parts.append(md_table( top_fields, [[r[f] for f in top_fields] for r in top_rows], )) md_parts.append("\n## Quality-scale distribution (within top " f"{len(top_rows)})\n\n") md_parts.append(md_table( ["tier", "count", "share_pct"], [[r["tier"], r["count"], r["share_pct"]] for r in dist_rows], )) md_parts.append("\n## Issue triage (sorted by open issues)\n\n") md_parts.append(md_table( triage_fields, [[r[f] for f in triage_fields] for r in triage_rows], )) md_path = outdir / "tables.md" md_path.write_text("".join(md_parts), encoding="utf-8") print(f"wrote {len(top_rows)} top rows, {len(dist_rows)} tiers, " f"{len(triage_rows)} triage rows, and {md_path}") return 0 if __name__ == "__main__": raise SystemExit(main())