diff --git a/docs/pr-blocker-audit.md b/docs/pr-blocker-audit.md new file mode 100644 index 0000000..b56f28c --- /dev/null +++ b/docs/pr-blocker-audit.md @@ -0,0 +1,188 @@ +# PR Blocker Audit + +`scripts/pr_blocker_audit.py` is a small, read-only triage helper for maintainers who need to inspect open pull request overlap before reviewing or starting related work. + +It is a triage helper, not a replacement for maintainer judgment. + +## What it does + +- Reads open PR metadata from a local JSON file or from `gh`. +- Reports files touched by more than one open PR. +- Groups active work into broad code areas. +- Ranks PRs with a deterministic heuristic score. +- Flags possible duplicate candidates based on title keyword overlap and changed-file similarity. +- Suggests quieter areas for conservative new work. +- Prints Markdown by default, compact terminal output when requested, or machine-readable JSON. + +## What it does not do + +- It does not post comments. +- It does not review, approve, label, close, merge, or otherwise mutate PRs. +- It does not add or run GitHub Actions. +- It does not import the Odysseus application package. +- It does not claim that a PR is definitely blocked or duplicated. + +## Read-only safety guarantee + +Offline mode only reads a local JSON file. Live mode runs read-only GitHub CLI commands: + +```bash +gh pr list --repo OWNER/REPO --state open --limit 1000 --json number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url +``` + +If a PR from that list has missing or empty changed-file metadata, live mode fills it with read-only per-PR REST calls: + +```bash +gh api --paginate "repos/OWNER/REPO/pulls/NUMBER/files?per_page=100" +``` + +If that GraphQL-backed command fails, it falls back to: + +```bash +gh api --paginate "repos/OWNER/REPO/pulls?state=open&per_page=100" +``` + +Per-PR file fetching makes live overlap results useful, but it can be slower on repositories with hundreds of open PRs. + +## Generate input JSON + +For repeatable offline audits, capture PR metadata first: + +```bash +gh pr list --repo OWNER/REPO --state open --limit 1000 --json number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url > open-prs.json +``` + +## Run offline mode + +```bash +python3 scripts/pr_blocker_audit.py --input open-prs.json +``` + +## Run live mode + +```bash +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO +``` + +Live mode fetches up to 1000 open PRs by default. Use `--limit` to cap how many open PRs are fetched and analyzed, and `--top` to cap how many rows are displayed in ranked sections: + +```bash +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO --limit 50 --top 10 +``` + +Live mode may take time on large PR queues because it fetches changed-file metadata for each PR that did not include it in the initial list response. Progress is shown on `stderr` by default only when `stderr` is a TTY: + +```bash +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO --progress auto +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO --progress always +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO --progress never +``` + +Use `--quiet` to suppress progress and non-fatal warning output. Progress and warnings never go to `stdout`, so redirected reports and `--output` files remain clean. + +For a faster metadata-only scan, skip changed-file metadata entirely: + +```bash +python3 scripts/pr_blocker_audit.py --repo OWNER/REPO --no-fetch-files +``` + +## JSON output + +Use `--format json` for machine-readable output suitable for scripting or downstream tooling: + +```bash +python3 scripts/pr_blocker_audit.py --input open-prs.json --format json +python3 scripts/pr_blocker_audit.py --input open-prs.json --format json --output report.json +``` + +JSON output is stable and deterministic for the same input. It uses `sort_keys=True` so field order does not vary between runs. It never includes ANSI escape codes, even with `--color always`. Progress text is always `stderr`-only and never appears in JSON output. + +The top-level object contains these keys: + +- `summary` — scalar overview: `total_prs_analyzed`, `unique_files_touched`, `prs_missing_changed_file_metadata`, `main_overlap_drivers`, `highest_risk_areas`, `recommended_first_review_target` +- `locked_areas` — list of objects with `area`, `files` (top paths as a string), `prs` (list of PR numbers), `why`, `priority` +- `hot_files` — list of objects with `file`, `pr_count`, `pr_numbers` (list of PR numbers); capped at `--top` +- `review_priorities` — ranked list with `rank`, `number`, `score`, `title`, `url`, `merge_state`, `review_decision`, `reasons` (list); capped at `--top` +- `duplicate_candidates` — list of objects with `pr_numbers` (list) and `titles` (list, one entry per PR in the group) +- `safer_areas` — list of strings + +## Write output to a file + +```bash +python3 scripts/pr_blocker_audit.py --input open-prs.json --output pr-blocker-report.md +python3 scripts/pr_blocker_audit.py --input open-prs.json --format json --output report.json +``` + +Markdown and JSON output never include ANSI color codes. ANSI codes are stripped defensively when writing any output file. + +## Terminal output and color + +Use terminal output for quick interactive scans: + +```bash +python3 scripts/pr_blocker_audit.py --input open-prs.json --format terminal +``` + +Terminal output includes locked areas, hot files, review / blocker priorities, possible duplicate candidates, and safer areas. + +Color is readability-only. It is never included in Markdown reports and is stripped defensively when writing output files. Color modes are: + +```bash +python3 scripts/pr_blocker_audit.py --input open-prs.json --format terminal --color auto +python3 scripts/pr_blocker_audit.py --input open-prs.json --format terminal --color always +python3 scripts/pr_blocker_audit.py --input open-prs.json --format terminal --color never +``` + +`--no-color` is kept as an alias for `--color never`. With `--color auto`, color is used only for terminal output on a TTY when `NO_COLOR` is not set and output is not being written to a file. + +## Interpret locked areas + +Locked areas are broad categories with one or more open PRs. An area is higher priority when several PRs touch it, when PRs share files, or when the highest scoring PR in that area has risk signals. Treat this as a prompt to inspect the PRs together. + +`PRs missing changed-file metadata` counts PRs that still had no changed-file paths after live file fetching, or PRs from offline input that did not include files. Those PRs can still appear in area summaries from title matching, but file overlap analysis is weaker for them. + +`Docs / tooling / tests` is conservative: runtime PRs are not classified there just because they include tests or README changes. Docs-only, README-only, scripts-only, tests-only, or strongly titled docs/tooling/test work still maps there. + +`Other / unclassified` is kept visible for PRs that do not match the area rules. When most of it comes from missing file metadata, the report summarizes that instead of letting long PR lists dominate the locked-area section. + +## Interpret duplicate candidates + +Duplicate candidates are labeled as possible duplicate / needs human review. The script groups PRs only when their file sets are highly similar and their titles share meaningful keywords. Similar PRs can still be complementary. + +## Interpret heuristic scores + +The review priority score is deterministic for the same input. Recency is measured against the newest parseable PR update timestamp in the input, and the score uses simple weights for: + +- direct auth, bearer-token, API-token, privilege, or permission lifecycle signals +- security, secret, or data exposure keywords +- persistence, migration, database, SQLite, or Postgres keywords +- memory, vector, RAG, embedding, or retrieval keywords +- overlapping changed files +- clean merge state as a small actionability signal +- review state +- recently updated PRs when timestamp data exists + +Higher scores mean "inspect earlier", not "correct" or "merge-ready". Broad PRs can score high because they overlap many files and may block other work, but they still need normal review and validation. + +Dirty, blocked, conflicting, and unknown merge states are shown as risk/caution reasons. They do not add importance points by themselves. + +## Design note: intentional single-script layout + +`pr_blocker_audit.py` is intentionally kept as one standalone script. The goal is to keep this maintainer/contributor workflow helper low-friction while broader repo tooling and test-suite conventions are still evolving. Splitting it into packages or modules is not ruled out, but is deferred until there is a clearer settled pattern to follow. + +## Limitations + +- Some PRs may still lack changed files if GitHub file metadata calls fail or metadata-only mode is used. +- Area classification is intentionally small and editable. +- Title keyword matching misses semantic duplicates. +- Heuristic scoring cannot know project strategy, reviewer availability, or hidden dependency chains. +- Empty or missing file metadata produces a valid report but weak overlap analysis. + +## Validation + +```bash +python3 -m py_compile scripts/pr_blocker_audit.py tests/test_pr_blocker_audit.py +python3 -m pytest tests/test_pr_blocker_audit.py -q +python3 scripts/pr_blocker_audit.py --help +git diff --check +``` diff --git a/scripts/pr_blocker_audit.py b/scripts/pr_blocker_audit.py new file mode 100644 index 0000000..074afea --- /dev/null +++ b/scripts/pr_blocker_audit.py @@ -0,0 +1,1051 @@ +#!/usr/bin/env python3 +"""Read-only pull request overlap audit helper. + +This script intentionally does not import the Odysseus application package. +It only reads local JSON input or invokes read-only `gh` list/API commands. +""" +from __future__ import annotations + +import argparse +import json +import os +import re +import subprocess +import sys +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable + + +AREA_RULES = [ + ( + "Auth / users / API tokens", + ("auth", "token", "api_key", "api-key", "apikey", "login", "totp"), + ("auth", "bearer token", "api token", "api key", "login", "privilege", "permission"), + ), + ( + "Memory / RAG / vector store", + ("memory", "rag", "vector", "embedding", "faiss", "chroma"), + ("memory", "rag", "vector", "embedding", "retrieval"), + ), + ("Search / web search", ("search", "ddg", "web_search"), ("search", "ddg", "web")), + ( + "Model routing / endpoint discovery", + ("model", "llm", "endpoint", "lmstudio", "ollama"), + ("model", "routing", "endpoint", "discovery", "llm"), + ), + ( + "Agent loop / tools", + ("agent", "tool", "function_call", "mcp", "shell"), + ("agent", "tool", "function", "mcp"), + ), + ("Cookbook / runners", ("cookbook", "runner", "preset"), ("cookbook", "runner", "preset")), + ("Email / CalDAV", ("mail", "email", "imap", "caldav", "calendar"), ("email", "mail", "caldav", "calendar")), + ( + "Documents / uploads", + ("document", "upload", "attachment", "processor", "markitdown"), + ("document", "upload", "attachment"), + ), + ("Gallery / visual report", ("gallery", "image", "vision", "preview"), ("gallery", "visual", "image")), + ( + "CI / repo process", + (".github", "docker", "compose", "workflow", "ci", "pytest"), + ("ci", "workflow", "docker", "compose"), + ), + ( + "Docs / tooling / tests", + ("docs/", "scripts/", "tests/", "README", "tooling"), + ("docs", "test", "tooling", "script"), + ), +] + +ALL_AREAS = [rule[0] for rule in AREA_RULES] + ["Other"] +WORD_RE = re.compile(r"[a-z0-9]+") +ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") +ANSI = { + "bold": "\033[1m", + "bold_red": "\033[1;31m", + "bold_cyan": "\033[1;36m", + "red": "\033[31m", + "yellow": "\033[33m", + "green": "\033[32m", + "cyan": "\033[36m", + "blue": "\033[34m", + "dim": "\033[2m", + "reset": "\033[0m", +} +STOP_WORDS = { + "a", + "add", + "and", + "bug", + "fix", + "for", + "in", + "new", + "of", + "pr", + "the", + "to", + "update", +} + + +@dataclass(frozen=True) +class PullRequest: + number: int + title: str + author: str + url: str + files: tuple[str, ...] + merge_state: str + review_decision: str + updated_at: str + areas: tuple[str, ...] + + +@dataclass(frozen=True) +class ScoredPullRequest: + pr: PullRequest + score: int + reasons: tuple[str, ...] + + +class ProgressReporter: + def __init__(self, enabled: bool, stream=None): + self.enabled = enabled + self.stream = stream or sys.stderr + self.last_len = 0 + + def phase(self, message: str) -> None: + if self.enabled: + self.stream.write(f"{message}\n") + self.stream.flush() + + def update(self, done: int, total: int, files_count: int, missing_count: int, number: int) -> None: + if not self.enabled: + return + percent = int(done * 100 / total) if total else 100 + line = ( + f"Fetching changed files: {done}/{total} PRs ({percent}%) | " + f"files {files_count} | missing {missing_count} | #{number}" + ) + line = line[:140] + padding = max(self.last_len - len(line), 0) + self.stream.write(f"\r{line}{' ' * padding}") + self.stream.flush() + self.last_len = len(line) + + def finish_line(self) -> None: + if self.enabled and self.last_len: + self.stream.write(f"\r{' ' * self.last_len}\r") + self.stream.flush() + self.last_len = 0 + + def summary(self, message: str) -> None: + if self.enabled: + self.finish_line() + self.stream.write(f"{message}\n") + self.stream.flush() + + +def load_json_file(path: Path): + try: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + except json.JSONDecodeError as exc: + raise ValueError(f"invalid JSON in {path}: {exc.msg} at line {exc.lineno}, column {exc.colno}") from exc + except OSError as exc: + raise ValueError(f"could not read {path}: {exc}") from exc + + +def fetch_live_prs(repo: str, fetch_files: bool = True, progress: ProgressReporter | None = None, limit: int = 1000): + progress = progress or ProgressReporter(False) + fields = ( + "number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url" + if fetch_files + else "number,title,author,mergeStateStatus,reviewDecision,updatedAt,url" + ) + cmd = ["gh", "pr", "list", "--repo", repo, "--state", "open", "--limit", str(limit), "--json", fields] + progress.phase("Fetching open PR list...") + try: + payload = _run_gh_json(cmd) + except RuntimeError: + api_path = f"repos/{repo}/pulls?state=open&per_page=100" + payload = _run_gh_json(["gh", "api", "--paginate", api_path]) + payload = _limit_payload(payload, limit) + if not fetch_files: + return payload + return _fill_missing_live_files(repo, payload, progress) + + +def _limit_payload(payload, limit: int): + if isinstance(payload, dict): + raw_prs = payload.get("items", []) + if isinstance(raw_prs, list): + return {**payload, "items": raw_prs[:limit]} + return payload + if isinstance(payload, list): + return payload[:limit] + return payload + + +def _fill_missing_live_files(repo: str, payload, progress: ProgressReporter | None = None): + progress = progress or ProgressReporter(False) + raw_prs = payload.get("items", []) if isinstance(payload, dict) else payload + if not isinstance(raw_prs, list): + return payload + + warnings = [] + targets = [item for item in raw_prs if isinstance(item, dict)] + progress.phase(f"Fetching changed files for {len(targets)} PRs...") + fetched_count = 0 + files_count = 0 + missing_count = 0 + for done, item in enumerate(targets, start=1): + number = _safe_int(item.get("number")) + current_files = _extract_files(item.get("files", [])) + if not number: + warnings.append("PR with missing number has no changed-file metadata") + missing_count += 1 + progress.update(done, len(targets), files_count, missing_count, number) + continue + if current_files: + fetched_count += 1 + files_count += len(current_files) + progress.update(done, len(targets), files_count, missing_count, number) + continue + try: + files = _fetch_live_pr_files(repo, number) + except RuntimeError as exc: + warnings.append(f"PR #{number}: could not fetch changed files: {exc}") + missing_count += 1 + progress.update(done, len(targets), files_count, missing_count, number) + continue + item["files"] = [{"path": path} for path in files] + files_count += len(files) + if files: + fetched_count += 1 + else: + missing_count += 1 + progress.update(done, len(targets), files_count, missing_count, number) + + progress.summary(f"Fetched changed files for {fetched_count}/{len(targets)} PRs; {missing_count} missing metadata.") + + if isinstance(payload, dict): + if warnings: + payload["warnings"] = [*payload.get("warnings", []), *warnings] + return payload + if warnings: + return {"items": payload, "warnings": warnings} + return payload + + +def _fetch_live_pr_files(repo: str, number: int) -> list[str]: + api_path = f"repos/{repo}/pulls/{number}/files?per_page=100" + payload = _run_gh_json(["gh", "api", "--paginate", api_path]) + return _extract_files(payload) + + +def _run_gh_json(cmd: list[str]): + result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) + if result.returncode != 0: + raise RuntimeError(result.stderr.strip() or f"{cmd[0]} exited with {result.returncode}") + try: + return json.loads(result.stdout or "[]") + except json.JSONDecodeError as exc: + raise RuntimeError(f"gh returned invalid JSON: {exc}") from exc + + +def normalize_prs(payload) -> list[PullRequest]: + raw_prs = payload.get("items", []) if isinstance(payload, dict) else payload + if raw_prs is None: + raw_prs = [] + if not isinstance(raw_prs, list): + raise ValueError("expected input JSON to be a list of pull requests or an object with an items list") + return [normalize_pr(item) for item in raw_prs if isinstance(item, dict)] + + +def missing_file_metadata_count(prs: list[PullRequest]) -> int: + return sum(1 for pr in prs if not pr.files) + + +def missing_metadata_warning(count: int) -> str: + noun = "PR" if count == 1 else "PRs" + return f"Warning: {count} {noun} still missing changed-file metadata." + + +def normalize_pr(item: dict) -> PullRequest: + files = tuple(sorted(set(_extract_files(item.get("files", []))))) + title = str(item.get("title") or "") + areas = tuple(sorted(classify_areas(files, title))) + return PullRequest( + number=_safe_int(item.get("number")), + title=title, + author=_extract_author(item), + url=str(item.get("url") or item.get("html_url") or ""), + files=files, + merge_state=str(item.get("mergeStateStatus") or item.get("merge_state_status") or item.get("mergeable_state") or "unknown"), + review_decision=str(item.get("reviewDecision") or item.get("review_decision") or "unknown"), + updated_at=str(item.get("updatedAt") or item.get("updated_at") or ""), + areas=areas, + ) + + +def _extract_files(files) -> list[str]: + if not isinstance(files, list): + return [] + paths = [] + for entry in files: + if isinstance(entry, str): + paths.append(entry) + elif isinstance(entry, dict): + path = entry.get("path") or entry.get("filename") or entry.get("name") + if path: + paths.append(str(path)) + return paths + + +def _extract_author(item: dict) -> str: + author = item.get("author") or item.get("user") or {} + if isinstance(author, dict): + return str(author.get("login") or "unknown") + return str(author or "unknown") + + +def _safe_int(value) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def classify_areas(files: Iterable[str], title: str = "") -> set[str]: + file_list = tuple(files) + file_text = " ".join(file_list).lower() + title_text = title.lower() + areas = set() + for area, path_keywords, title_keywords in AREA_RULES: + if area == "Docs / tooling / tests": + if is_docs_tooling_only(file_list) or title_strongly_indicates_docs_tooling(title_text): + areas.add(area) + continue + if any(keyword.lower() in file_text for keyword in path_keywords): + areas.add(area) + continue + if any(title_has_keyword(title_text, keyword) for keyword in title_keywords): + areas.add(area) + return areas or {"Other"} + + +def is_docs_tooling_only(files: Iterable[str]) -> bool: + file_list = [path.lower() for path in files] + return bool(file_list) and all(is_docs_tooling_path(path) for path in file_list) + + +def is_docs_tooling_path(path: str) -> bool: + name = path.rsplit("/", 1)[-1] + return ( + path.startswith("docs/") + or path.startswith("scripts/") + or path.startswith("tests/") + or path.startswith(".github/") + or "tooling" in path + or name.startswith("readme") + or name in {"pytest.ini", "tox.ini", "mypy.ini", "ruff.toml"} + ) + + +def title_strongly_indicates_docs_tooling(title: str) -> bool: + words_set = set(words(title)) + phrases = ( + "docs only", + "documentation only", + "test only", + "tests only", + "tooling only", + "script only", + "scripts only", + ) + return any(phrase in title for phrase in phrases) or bool( + words_set & {"docs", "documentation", "readme", "tests", "tooling", "scripts"} + ) and not bool(words_set & {"api", "auth", "route", "runtime", "server", "ui", "memory", "model", "email"}) + + +def title_has_keyword(title: str, keyword: str) -> bool: + keyword = keyword.lower() + if " " in keyword: + return keyword in title + return keyword in set(words(title)) + + +def hot_files(prs: list[PullRequest]) -> list[tuple[str, list[int]]]: + owners: dict[str, list[int]] = defaultdict(list) + for pr in prs: + for path in pr.files: + owners[path].append(pr.number) + rows = [(path, sorted(numbers)) for path, numbers in owners.items() if len(numbers) > 1] + return sorted(rows, key=lambda row: (-len(row[1]), row[0])) + + +def overlap_clusters(prs: list[PullRequest]) -> list[list[PullRequest]]: + by_file: dict[str, list[int]] = defaultdict(list) + by_number = {pr.number: pr for pr in prs} + for pr in prs: + for path in pr.files: + by_file[path].append(pr.number) + + edges: dict[int, set[int]] = defaultdict(set) + for numbers in by_file.values(): + if len(numbers) < 2: + continue + for number in numbers: + edges[number].update(n for n in numbers if n != number) + + seen = set() + clusters = [] + for number in sorted(edges): + if number in seen: + continue + stack = [number] + cluster_numbers = set() + while stack: + current = stack.pop() + if current in cluster_numbers: + continue + cluster_numbers.add(current) + stack.extend(edges[current] - cluster_numbers) + seen.update(cluster_numbers) + clusters.append([by_number[n] for n in sorted(cluster_numbers) if n in by_number]) + return sorted(clusters, key=lambda cluster: (-len(cluster), [pr.number for pr in cluster])) + + +def score_prs(prs: list[PullRequest], now: datetime | None = None) -> list[ScoredPullRequest]: + now = now or reference_time(prs) + file_counts = Counter(path for pr in prs for path in pr.files) + scored = [score_pr(pr, file_counts, now) for pr in prs] + return sorted(scored, key=lambda item: (-item.score, item.pr.number)) + + +def score_pr(pr: PullRequest, file_counts: Counter, now: datetime) -> ScoredPullRequest: + score = 0 + reasons = [] + text = f"{pr.title} {' '.join(pr.files)}".lower() + + # Heuristic, not a truth model: weights favor direct auth/token + # lifecycle fixes first, then confidentiality/persistence/memory risk, + # overlap pressure, review state, and actionability. Merge conflicts are + # caution signals only; they do not prove importance. + if direct_auth_token_signal(pr): + score += 45 + reasons.append("direct auth/token lifecycle signal") + elif any(word in text for word in ("security", "secret", "privilege", "permission")): + score += 22 + reasons.append("security keyword") + + if any(word in text for word in ("leak", "leaks", "exposure", "cross-user", "cross user", "privacy")): + score += 18 + reasons.append("data exposure keyword") + if any(word in text for word in ("data-loss", "persistence", "migration", "database", "sqlite", "postgres")): + score += 20 + reasons.append("persistence/migration keyword") + if any(word in text for word in ("memory", "vector", "rag", "embedding", "retrieval")): + score += 15 + reasons.append("memory/RAG keyword") + + overlap_count = sum(1 for path in pr.files if file_counts[path] > 1) + if overlap_count: + points = min(overlap_count * 3, 30) + score += points + reasons.append(f"{overlap_count} overlapping file(s)") + + merge_state = pr.merge_state.lower() + if merge_state in {"clean", "has_hooks"}: + score += 3 + reasons.append("clean/actionable merge state") + elif merge_state in {"dirty", "blocked", "conflicting", "unstable"}: + reasons.append(f"caution: merge state {pr.merge_state}") + elif merge_state in {"unknown", ""}: + reasons.append("caution: merge state unknown") + + review_decision = pr.review_decision.lower() + if review_decision == "approved": + score -= 8 + reasons.append("already approved") + elif review_decision == "changes_requested": + score += 10 + reasons.append("changes requested") + elif review_decision == "review_required": + score += 6 + reasons.append("review required") + elif review_decision in {"unknown", "", "none"}: + score += 4 + reasons.append("review state unknown") + + age_days = days_since(pr.updated_at, now) + if age_days is not None and age_days <= 7: + score += 8 + reasons.append("updated in last 7 days") + elif age_days is not None and age_days <= 30: + score += 4 + reasons.append("updated in last 30 days") + + return ScoredPullRequest(pr=pr, score=score, reasons=tuple(reasons or ["low overlap / low signal"])) + + +def direct_auth_token_signal(pr: PullRequest) -> bool: + file_text = " ".join(pr.files).lower() + title = pr.title.lower() + path_hit = any( + keyword in file_text + for keyword in ("auth", "token", "api_key", "api-key", "apikey", "key_manager", "security") + ) + title_hit = any( + phrase in title + for phrase in ("bearer token", "api token", "api key", "auth", "login", "privilege", "permission") + ) + lifecycle_hit = any(word in title for word in ("deleted", "revoked", "expired", "disabled", "removed")) + return path_hit and (title_hit or lifecycle_hit) + + +def days_since(value: str, now: datetime) -> int | None: + parsed = parse_datetime(value) + if parsed is None: + return None + return max((now - parsed).days, 0) + + +def reference_time(prs: list[PullRequest]) -> datetime: + parsed = [value for value in (parse_datetime(pr.updated_at) for pr in prs) if value is not None] + if parsed: + return max(parsed) + return datetime.now(timezone.utc) + + +def parse_datetime(value: str) -> datetime | None: + if not value: + return None + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + + +def duplicate_candidates(prs: list[PullRequest]) -> list[list[PullRequest]]: + matches: dict[int, set[int]] = defaultdict(set) + by_number = {pr.number: pr for pr in prs} + for index, left in enumerate(prs): + for right in prs[index + 1 :]: + if _looks_similar(left, right): + matches[left.number].add(right.number) + matches[right.number].add(left.number) + return _groups_from_matches(matches, by_number) + + +def _looks_similar(left: PullRequest, right: PullRequest) -> bool: + left_files = set(left.files) + right_files = set(right.files) + if not left_files or not right_files: + return False + file_similarity = len(left_files & right_files) / len(left_files | right_files) + shared_title = title_keywords(left.title) & title_keywords(right.title) + return file_similarity >= 0.5 and len(shared_title) >= 2 + + +def _groups_from_matches(matches: dict[int, set[int]], by_number: dict[int, PullRequest]) -> list[list[PullRequest]]: + seen = set() + groups = [] + for number in sorted(matches): + if number in seen: + continue + stack = [number] + group = set() + while stack: + current = stack.pop() + if current in group: + continue + group.add(current) + stack.extend(matches[current] - group) + seen.update(group) + groups.append([by_number[n] for n in sorted(group) if n in by_number]) + return sorted(groups, key=lambda group: (-len(group), [pr.number for pr in group])) + + +def words(value: str) -> list[str]: + return WORD_RE.findall(value.lower()) + + +def title_keywords(title: str) -> set[str]: + return {word for word in words(title) if len(word) > 2 and word not in STOP_WORDS} + + +def locked_areas(prs: list[PullRequest], scored: list[ScoredPullRequest]) -> list[dict[str, object]]: + score_by_number = {item.pr.number: item.score for item in scored} + rows = [] + for area in ALL_AREAS: + area_prs = [pr for pr in prs if area in pr.areas] + if not area_prs: + continue + area_files = Counter(path for pr in area_prs for path in pr.files) + overlapping = [path for path, count in area_files.items() if count > 1] + max_score = max(score_by_number.get(pr.number, 0) for pr in area_prs) + missing_files = sum(1 for pr in area_prs if not pr.files) + priority = _locked_area_priority(area, area_prs, max_score) + why = _locked_area_why(area, missing_files, len(area_prs), bool(overlapping)) + if missing_files and area != "Other": + why += "; some PRs have no file metadata" + rows.append( + { + "area": "Other / unclassified" if area == "Other" else area, + "files": _summarize_files(area_files), + "prs": [pr.number for pr in sorted(area_prs, key=lambda item: item.number)], + "why": why, + "priority": priority, + "is_other": area == "Other", + } + ) + return sorted(rows, key=lambda row: (bool(row["is_other"]), _priority_rank(str(row["priority"])), -len(row["prs"]), str(row["area"]))) + + +def _locked_area_priority(area: str, prs: list[PullRequest], max_score: int) -> str: + if area == "Other" and all(not pr.files for pr in prs): + return "watch" + return "critical" if len(prs) >= 4 or max_score >= 45 else "high" if len(prs) >= 2 or max_score >= 30 else "watch" + + +def _locked_area_why(area: str, missing_files: int, total_prs: int, has_overlap: bool) -> str: + if area == "Other" and missing_files > total_prs / 2: + return f"{total_prs} PRs, mostly missing changed-file metadata" + return "shared file overlap" if has_overlap else "active open PRs in area" + + +def _summarize_files(counts: Counter) -> str: + if not counts: + return "No changed-file metadata" + top = [path for path, _count in counts.most_common(5)] + return ", ".join(top) + + +def _priority_rank(priority: str) -> int: + return {"critical": 0, "high": 1, "watch": 2}.get(priority, 3) + + +def safer_areas(prs: list[PullRequest]) -> list[str]: + area_counts = Counter(area for pr in prs for area in pr.areas) + suggestions = [] + for area in ALL_AREAS: + count = area_counts.get(area, 0) + if count == 0: + suggestions.append(f"{area}: no open PRs in this input matched the area mapping") + elif area == "Docs / tooling / tests" and count <= 2: + suggestions.append(f"{area}: low overlap; good candidate for docs, tests, or maintenance-only work") + if not suggestions: + suggestions.append("No clearly quiet area found; prefer narrow docs, tests, or tooling work after checking current PRs.") + return suggestions[:6] + + +def build_structured_report(prs: list[PullRequest], top: int = 15) -> dict: + top = max(top, 1) + scored = score_prs(prs) + hot = hot_files(prs) + locked = locked_areas(prs, scored) + duplicates = duplicate_candidates(prs) + unique_files = len({path for pr in prs for path in pr.files}) + missing_files = missing_file_metadata_count(prs) + target = scored[0] if scored else None + + return { + "summary": { + "highest_risk_areas": _risk_summary(locked), + "main_overlap_drivers": _overlap_driver_summary(hot), + "prs_missing_changed_file_metadata": missing_files, + "recommended_first_review_target": _target_summary(target), + "total_prs_analyzed": len(prs), + "unique_files_touched": unique_files, + }, + "locked_areas": [ + { + "area": row["area"], + "files": row["files"], + "priority": row["priority"], + "prs": row["prs"], + "why": row["why"], + } + for row in locked + ], + "hot_files": [ + { + "file": path, + "pr_count": len(numbers), + "pr_numbers": numbers, + } + for path, numbers in hot[:top] + ], + "review_priorities": [ + { + "merge_state": item.pr.merge_state, + "number": item.pr.number, + "rank": index, + "reasons": list(item.reasons), + "review_decision": item.pr.review_decision, + "score": item.score, + "title": item.pr.title or "untitled", + "url": item.pr.url, + } + for index, item in enumerate(scored[:top], start=1) + ], + "duplicate_candidates": [ + { + "pr_numbers": [pr.number for pr in group], + "titles": [pr.title or "untitled" for pr in group], + } + for group in duplicates + ], + "safer_areas": safer_areas(prs), + } + + +def render_json(prs: list[PullRequest], top: int = 15) -> str: + return json.dumps(build_structured_report(prs, top), indent=2, sort_keys=True) + "\n" + + +def render_markdown(prs: list[PullRequest], top: int = 15) -> str: + top = max(top, 1) + scored = score_prs(prs) + hot = hot_files(prs) + locked = locked_areas(prs, scored) + duplicates = duplicate_candidates(prs) + unique_files = len({path for pr in prs for path in pr.files}) + missing_files = missing_file_metadata_count(prs) + target = scored[0] if scored else None + + lines = ["# PR Blocker Audit", "", "## Executive summary", ""] + lines.append(f"- Total PRs analyzed: {len(prs)}") + lines.append(f"- Unique files touched: {unique_files}") + lines.append(f"- PRs missing changed-file metadata: {missing_files}") + lines.append(f"- Main overlap drivers: {_overlap_driver_summary(hot)}") + lines.append(f"- Highest-risk areas: {_risk_summary(locked)}") + lines.append(f"- Recommended first review target: {_target_summary(target)}") + lines.extend(["", "## Locked code areas", ""]) + lines.extend(_table(["area", "files/directories", "PRs", "why locked", "priority"], _locked_rows(locked))) + lines.extend(["", "## Hot files", ""]) + lines.extend(_table(["file", "PR count", "PR numbers"], _hot_rows(hot, top))) + lines.extend(["", "## Review / blocker priorities", ""]) + lines.append("Heuristic score only; inspect these earlier, do not merge without validation.") + lines.append("") + lines.extend(_review_rows(scored, top)) + lines.extend(["", "## Duplicate candidates", ""]) + lines.extend(_duplicate_rows(duplicates)) + lines.extend(["", "## Safer areas for new work", ""]) + lines.extend(f"- {item}" for item in safer_areas(prs)) + lines.append("") + return "\n".join(lines) + + +def render_terminal(prs: list[PullRequest], top: int = 15, use_color: bool = False) -> str: + top = max(top, 1) + scored = score_prs(prs) + hot = hot_files(prs) + locked = locked_areas(prs, scored) + duplicates = duplicate_candidates(prs) + unique_files = len({path for pr in prs for path in pr.files}) + missing_files = missing_file_metadata_count(prs) + target = scored[0] if scored else None + + lines = [colorize("PR Blocker Audit", "bold_cyan", use_color), ""] + lines.append(f"PRs analyzed: {len(prs)}") + lines.append(f"Unique files touched: {unique_files}") + lines.append(f"PRs missing changed-file metadata: {missing_files}") + lines.append(f"Main overlap drivers: {_overlap_driver_summary(hot)}") + lines.append(f"Recommended first review target: {_target_summary(target, truncate=True)}") + lines.extend(["", colorize("Locked areas", "bold_cyan", use_color)]) + if locked: + for row in locked[:top]: + priority = str(row["priority"]) + label = colorize(priority.upper(), priority_color(priority), use_color) + prs_text = _format_pr_numbers(row["prs"]) + lines.append(f"- {label} {row['area']}: {prs_text} ({row['why']})") + lines.append(colorize(f" {row['files']}", "dim", use_color)) + else: + lines.append("- none") + + lines.extend(["", colorize("Hot files", "bold_cyan", use_color)]) + lines.extend(_terminal_hot_rows(hot, top, use_color)) + lines.extend(["", colorize("Review / blocker priorities", "bold_cyan", use_color)]) + lines.append(colorize("Heuristic score only; inspect these first, do not merge without validation.", "dim", use_color)) + if scored: + for item in scored[:top]: + pr = item.pr + state = colorize(pr.merge_state or "unknown", merge_state_color(pr.merge_state), use_color) + reasons = "; ".join(item.reasons[:3]) + title = shorten_text(pr.title or "untitled") + lines.append(f"- {item.score:>3} #{pr.number:<5} {state:<18} {title}") + lines.append(colorize(f" {reasons}", "dim", use_color)) + else: + lines.append("- none") + + lines.extend(["", colorize("Possible duplicates", "bold_cyan", use_color)]) + lines.extend(_terminal_duplicate_rows(duplicates)) + lines.extend(["", colorize("Safer areas", "bold_cyan", use_color)]) + lines.extend(f"- {item}" for item in safer_areas(prs)) + lines.append("") + return "\n".join(lines) + + +def _terminal_hot_rows(hot: list[tuple[str, list[int]]], top: int, use_color: bool) -> list[str]: + if not hot: + return ["- none"] + rows = [] + for path, numbers in hot[:top]: + count_label = f"{len(numbers)} PRs" + rows.append(f"- {path:<28} {colorize(count_label, hot_count_color(len(numbers)), use_color)} {_format_pr_numbers(numbers)}") + return rows + + +def _terminal_duplicate_rows(groups: list[list[PullRequest]]) -> list[str]: + if not groups: + return ["- none detected"] + rows = [] + for group in groups: + numbers = _format_pr_numbers(pr.number for pr in group) + titles = "; ".join(shorten_text(pr.title or "untitled", 80) for pr in group) + rows.append(f"- Possible duplicate / needs human review: {numbers} - {titles}") + return rows + + +def colorize(text: object, style: str, use_color: bool) -> str: + value = str(text) + if not use_color: + return value + return f"{ANSI[style]}{value}{ANSI['reset']}" + + +def priority_color(priority: str) -> str: + return {"critical": "bold_red", "high": "yellow", "watch": "cyan"}.get(priority.lower(), "blue") + + +def hot_count_color(count: int) -> str: + return "bold_red" if count >= 4 else "yellow" if count >= 2 else "dim" + + +def merge_state_color(state: str) -> str: + normalized = (state or "unknown").lower() + if normalized == "clean": + return "green" + if normalized in {"dirty", "blocked", "conflicting", "unstable"}: + return "red" + return "yellow" + + +def should_use_color(args: argparse.Namespace) -> bool: + if args.format != "terminal": + return False + if args.color == "always": + if os.name == "nt": + enable_windows_vt_mode() + return True + if args.color == "never" or args.output: + return False + if not sys.stdout.isatty() or "NO_COLOR" in os.environ or os.environ.get("TERM") == "dumb": + return False + if os.name == "nt": + return enable_windows_vt_mode() + return bool(os.environ.get("TERM") or os.environ.get("COLORTERM")) + + +def should_show_progress(args: argparse.Namespace) -> bool: + if args.quiet or args.input or args.no_fetch_files: + return False + if args.progress == "always": + return True + if args.progress == "never": + return False + return sys.stderr.isatty() + + +def enable_windows_vt_mode() -> bool: + if os.name != "nt": + return True + try: + import ctypes + + kernel32 = ctypes.windll.kernel32 + handle = kernel32.GetStdHandle(-11) + mode = ctypes.c_uint32() + if not kernel32.GetConsoleMode(handle, ctypes.byref(mode)): + return False + return bool(kernel32.SetConsoleMode(handle, mode.value | 0x0004)) + except Exception: + return False + + +def _cluster_summary(clusters: list[list[PullRequest]]) -> str: + if not clusters: + return "none detected" + summary = [] + for cluster in clusters[:3]: + summary.append(f"{len(cluster)} PRs ({_format_pr_numbers(pr.number for pr in cluster)})") + return "; ".join(summary) + + +def _overlap_driver_summary(hot: list[tuple[str, list[int]]], limit: int = 3) -> str: + if not hot: + return "none detected" + return ", ".join(f"{path} ({len(numbers)} PRs)" for path, numbers in hot[:limit]) + + +def _risk_summary(locked: list[dict[str, object]]) -> str: + if not locked: + return "none detected" + return ", ".join(f"{row['area']} ({row['priority']})" for row in locked[:3]) + + +def _target_summary(target: ScoredPullRequest | None, truncate: bool = False) -> str: + if target is None: + return "none; no PRs in input" + title = target.pr.title or "untitled" + if truncate: + title = shorten_text(title) + return f"PR #{target.pr.number} ({target.score}) - {title}" + + +def _locked_rows(locked: list[dict[str, object]]) -> list[list[str]]: + if not locked: + return [["none", "none", "none", "none", "none"]] + return [ + [ + str(row["area"]), + str(row["files"]), + _format_pr_numbers(row["prs"]), + str(row["why"]), + str(row["priority"]), + ] + for row in locked + ] + + +def _hot_rows(hot: list[tuple[str, list[int]]], top: int) -> list[list[str]]: + if not hot: + return [["none", "0", "none"]] + return [[path, str(len(numbers)), _format_pr_numbers(numbers)] for path, numbers in hot[:top]] + + +def _review_rows(scored: list[ScoredPullRequest], top: int) -> list[str]: + if not scored: + return ["No PRs to rank."] + lines = [] + for index, item in enumerate(scored[:top], start=1): + pr = item.pr + link = f"[#{pr.number}]({pr.url})" if pr.url else f"#{pr.number}" + reasons = "; ".join(item.reasons) + lines.append(f"{index}. {link} score {item.score}: {pr.title or 'untitled'} ({reasons})") + return lines + + +def _duplicate_rows(groups: list[list[PullRequest]]) -> list[str]: + if not groups: + return ["No possible duplicate groups detected from title/file overlap."] + lines = [] + for group in groups: + numbers = _format_pr_numbers(pr.number for pr in group) + titles = "; ".join(f"#{pr.number} {pr.title or 'untitled'}" for pr in group) + lines.append(f"- Possible duplicate / needs human review: {numbers} - {titles}") + return lines + + +def _table(headers: list[str], rows: list[list[str]]) -> list[str]: + escaped_headers = [_escape_cell(item) for item in headers] + lines = ["| " + " | ".join(escaped_headers) + " |"] + lines.append("| " + " | ".join("---" for _ in headers) + " |") + for row in rows: + lines.append("| " + " | ".join(_escape_cell(item) for item in row) + " |") + return lines + + +def _escape_cell(value: object) -> str: + return str(value).replace("|", "\\|").replace("\n", " ") + + +def _format_pr_numbers(numbers: Iterable[int], limit: int = 12) -> str: + raw_values = [number for number in numbers if number] + values = [f"#{number}" for number in raw_values[:limit]] + if len(raw_values) > limit: + values.append(f"... (+{len(raw_values) - limit} more)") + return ", ".join(values) if values else "unknown" + + +def shorten_text(text: str, max_len: int = 110) -> str: + if len(text) <= max_len: + return text + if max_len <= 1: + return "..." + return text[: max_len - 3].rstrip() + "..." + + +def positive_int(value: str) -> int: + try: + parsed = int(value) + except ValueError as exc: + raise argparse.ArgumentTypeError("must be a positive integer") from exc + if parsed <= 0: + raise argparse.ArgumentTypeError("must be a positive integer") + return parsed + + +def write_output(report: str, path: str | None) -> None: + if path: + Path(path).write_text(ANSI_RE.sub("", report), encoding="utf-8") + return + sys.stdout.write(report) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Read-only audit of open PR file overlap and blocker risk.") + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--input", help="Path to JSON from gh pr list --json ... or REST-ish PR payloads") + source.add_argument("--repo", help="GitHub repository in owner/name form; uses read-only gh commands") + parser.add_argument("--output", help="Write report to this path instead of stdout") + parser.add_argument("--limit", type=positive_int, default=1000, help="Live mode: max open PRs to fetch/analyze") + parser.add_argument("--top", type=positive_int, default=15, help="Rows to show in ranked sections") + parser.add_argument("--color", choices=["auto", "always", "never"], default="auto", help="Terminal color mode") + parser.add_argument("--no-color", action="store_const", const="never", dest="color", help="Alias for --color never") + parser.add_argument("--format", choices=["markdown", "terminal", "json"], default="markdown", help="Output format") + parser.add_argument("--no-fetch-files", action="store_true", help="Skip per-PR changed-file API calls in live mode") + parser.add_argument("--progress", choices=["auto", "always", "never"], default="auto", help="Live file-fetch progress mode") + parser.add_argument("--quiet", action="store_true", help="Suppress progress and non-fatal warning output") + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + if args.input: + payload = load_json_file(Path(args.input)) + else: + progress = ProgressReporter(should_show_progress(args)) + payload = fetch_live_prs(args.repo, fetch_files=not args.no_fetch_files, progress=progress, limit=args.limit) + prs = normalize_prs(payload) + missing_files = missing_file_metadata_count(prs) + if args.repo and not args.no_fetch_files and not args.quiet and missing_files: + sys.stderr.write(f"{missing_metadata_warning(missing_files)}\n") + if args.format == "terminal": + report = render_terminal(prs, top=args.top, use_color=should_use_color(args)) + elif args.format == "json": + report = render_json(prs, top=args.top) + else: + report = render_markdown(prs, top=args.top) + write_output(report, args.output) + except (RuntimeError, ValueError) as exc: + sys.stderr.write(f"error: {exc}\n") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_pr_blocker_audit.py b/tests/test_pr_blocker_audit.py new file mode 100644 index 0000000..b5b2a88 --- /dev/null +++ b/tests/test_pr_blocker_audit.py @@ -0,0 +1,964 @@ +import importlib.util +import json +import pytest +import re +import sys +from datetime import datetime, timezone +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +SCRIPT_PATH = ROOT / "scripts" / "pr_blocker_audit.py" + + +def load_module(): + spec = importlib.util.spec_from_file_location("pr_blocker_audit", SCRIPT_PATH) + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def test_parses_graphql_style_pr_json(): + audit = load_module() + prs = audit.normalize_prs( + [ + { + "number": 7, + "title": "Fix auth token rotation", + "author": {"login": "alice"}, + "url": "https://example.test/pr/7", + "mergeStateStatus": "CLEAN", + "reviewDecision": "REVIEW_REQUIRED", + "updatedAt": "2026-05-30T12:00:00Z", + "files": [{"path": "core/auth/tokens.py"}], + } + ] + ) + + assert prs[0].number == 7 + assert prs[0].author == "alice" + assert prs[0].url.endswith("/7") + assert prs[0].files == ("core/auth/tokens.py",) + assert "Auth / users / API tokens" in prs[0].areas + + +def test_parses_rest_style_pr_json(): + audit = load_module() + prs = audit.normalize_prs( + [ + { + "number": 8, + "title": "Improve uploads", + "user": {"login": "bob"}, + "html_url": "https://example.test/pr/8", + "mergeable_state": "dirty", + "files": [{"filename": "app/documents/upload.py"}], + } + ] + ) + + assert prs[0].author == "bob" + assert prs[0].url.endswith("/8") + assert prs[0].merge_state == "dirty" + assert prs[0].files == ("app/documents/upload.py",) + + +def test_parses_file_lists_as_dicts_and_strings(): + audit = load_module() + prs = audit.normalize_prs( + [ + { + "number": 1, + "title": "Memory update", + "files": ["core/memory.py", {"path": "tests/test_memory.py"}, {"filename": "docs/memory.md"}], + } + ] + ) + + assert prs[0].files == ("core/memory.py", "docs/memory.md", "tests/test_memory.py") + + +def test_missing_files_is_handled(): + audit = load_module() + prs = audit.normalize_prs([{"number": 2, "title": "No file metadata"}]) + + assert prs[0].files == () + assert prs[0].author == "unknown" + + +def test_fetch_live_prs_fills_missing_files(monkeypatch): + audit = load_module() + calls = [] + + def fake_run(cmd): + calls.append(cmd) + if cmd[:3] == ["gh", "pr", "list"]: + return [ + {"number": 1, "title": "Has files", "files": [{"path": "core/auth.py"}]}, + {"number": 2, "title": "Needs files", "files": []}, + ] + return [{"filename": "core/search.py"}, {"filename": "tests/test_search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + payload = audit.fetch_live_prs("owner/repo") + prs = audit.normalize_prs(payload) + + assert [pr.files for pr in prs] == [("core/auth.py",), ("core/search.py", "tests/test_search.py")] + assert calls[-1] == ["gh", "api", "--paginate", "repos/owner/repo/pulls/2/files?per_page=100"] + + +def test_fetch_live_prs_keeps_missing_files_when_per_pr_fetch_fails(monkeypatch): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 3, "title": "Needs files", "files": []}] + raise RuntimeError("rate limit") + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + payload = audit.fetch_live_prs("owner/repo") + prs = audit.normalize_prs(payload) + + assert prs[0].files == () + assert "PR #3: could not fetch changed files: rate limit" in payload["warnings"] + + +def test_fetch_live_prs_no_fetch_files_skips_per_pr_calls(monkeypatch): + audit = load_module() + calls = [] + + def fake_run(cmd): + calls.append(cmd) + return [{"number": 4, "title": "Metadata only", "files": []}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + payload = audit.fetch_live_prs("owner/repo", fetch_files=False) + + assert payload == [{"number": 4, "title": "Metadata only", "files": []}] + assert len(calls) == 1 + + +def test_fetch_live_prs_passes_limit_to_gh_pr_list(monkeypatch): + audit = load_module() + calls = [] + + def fake_run(cmd): + calls.append(cmd) + return [] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + audit.fetch_live_prs("owner/repo", fetch_files=True, limit=50) + + assert calls[0] == [ + "gh", + "pr", + "list", + "--repo", + "owner/repo", + "--state", + "open", + "--limit", + "50", + "--json", + "number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url", + ] + + +def test_no_fetch_files_omits_files_from_gh_pr_list(monkeypatch): + audit = load_module() + calls = [] + + def fake_run(cmd): + calls.append(cmd) + return [] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + audit.fetch_live_prs("owner/repo", fetch_files=False, limit=50) + + assert calls[0][-1] == "number,title,author,mergeStateStatus,reviewDecision,updatedAt,url" + + +def test_fetch_live_prs_caps_rest_fallback_by_limit(monkeypatch): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + raise RuntimeError("graphql unavailable") + return [ + {"number": 1, "title": "A", "files": []}, + {"number": 2, "title": "B", "files": []}, + {"number": 3, "title": "C", "files": []}, + ] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + payload = audit.fetch_live_prs("owner/repo", fetch_files=False, limit=2) + + assert [item["number"] for item in payload] == [1, 2] + + +def test_offline_input_ignores_limit(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text( + json.dumps( + [ + {"number": 1, "title": "A", "files": []}, + {"number": 2, "title": "B", "files": []}, + ] + ), + encoding="utf-8", + ) + + exit_code = audit.main(["--input", str(path), "--limit", "1"]) + output = capsys.readouterr().out + + assert exit_code == 0 + assert "Total PRs analyzed: 2" in output + + +def test_invalid_limit_exits_cleanly(capsys): + audit = load_module() + + with pytest.raises(SystemExit) as exc: + audit.main(["--repo", "owner/repo", "--limit", "0"]) + + assert exc.value.code == 2 + assert "must be a positive integer" in capsys.readouterr().err + + +def test_help_includes_limit(): + audit = load_module() + + help_text = audit.build_parser().format_help() + + assert "--limit LIMIT" in help_text + assert "Live mode: max open PRs to fetch/analyze" in help_text + + +def test_progress_goes_to_stderr_not_stdout(monkeypatch, capsys): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 5, "title": "Needs files", "files": []}] + return [{"filename": "core/search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main(["--repo", "owner/repo", "--format", "terminal", "--progress", "always"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "PR Blocker Audit" in captured.out + assert "Fetching open PR list..." not in captured.out + assert "Fetching open PR list..." in captured.err + assert "Fetching changed files:" in captured.err + + +def test_progress_not_shown_for_offline_input(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps([{"number": 6, "title": "Offline", "files": []}]), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--progress", "always"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "PR Blocker Audit" in captured.out + assert "Fetching open PR list..." not in captured.err + + +def test_progress_auto_hidden_when_stderr_is_not_tty(monkeypatch, capsys): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 7, "title": "Needs files", "files": []}] + return [{"filename": "core/search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + monkeypatch.setattr(audit.sys.stderr, "isatty", lambda: False) + + exit_code = audit.main(["--repo", "owner/repo", "--progress", "auto"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Fetching open PR list..." not in captured.err + + +def test_progress_always_shown_when_stderr_is_not_tty(monkeypatch, capsys): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 8, "title": "Needs files", "files": []}] + return [{"filename": "core/search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + monkeypatch.setattr(audit.sys.stderr, "isatty", lambda: False) + + exit_code = audit.main(["--repo", "owner/repo", "--progress", "always"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Fetching open PR list..." in captured.err + + +def test_quiet_suppresses_progress_and_warning(monkeypatch, capsys): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 9, "title": "Needs files", "files": []}] + raise RuntimeError("rate limit") + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main(["--repo", "owner/repo", "--progress", "always", "--quiet"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "PRs missing changed-file metadata: 1" in captured.out + assert captured.err == "" + + +def test_report_output_remains_clean_with_progress(monkeypatch, capsys): + audit = load_module() + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 10, "title": "Needs files", "files": []}] + return [{"filename": "core/search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main(["--repo", "owner/repo", "--format", "terminal", "--progress", "always"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Fetching changed files:" not in captured.out + assert "Fetched changed files" not in captured.out + assert "core/search.py" in captured.out + + +def test_markdown_output_file_has_no_progress_or_ansi(monkeypatch, tmp_path, capsys): + audit = load_module() + output_path = tmp_path / "report.md" + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return [{"number": 11, "title": "Needs files", "files": []}] + return [{"filename": "core/search.py"}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main(["--repo", "owner/repo", "--output", str(output_path), "--progress", "always"]) + captured = capsys.readouterr() + report = output_path.read_text(encoding="utf-8") + + assert exit_code == 0 + assert captured.out == "" + assert "Fetching changed files:" in captured.err + assert "Fetching changed files:" not in report + assert not re.search(r"\x1b\[[0-9;]*m", report) + + +def test_no_fetch_files_skips_progress(monkeypatch, capsys): + audit = load_module() + calls = [] + + def fake_run(cmd): + calls.append(cmd) + return [{"number": 12, "title": "Metadata only", "files": []}] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main(["--repo", "owner/repo", "--no-fetch-files", "--progress", "always"]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert len(calls) == 1 + assert "Fetching changed files" not in captured.err + + +def test_area_classification(): + audit = load_module() + + areas = audit.classify_areas(["scripts/odysseus-mail", "tests/test_email.py"], "CalDAV sync") + + assert "Email / CalDAV" in areas + assert "Docs / tooling / tests" in areas + + +def test_runtime_plus_test_file_is_not_docs_tooling(): + audit = load_module() + + areas = audit.classify_areas(["routes/memory_routes.py", "tests/test_memory_routes.py"], "Fix memory route") + + assert "Memory / RAG / vector store" in areas + assert "Docs / tooling / tests" not in areas + + +def test_docs_only_pr_is_docs_tooling(): + audit = load_module() + + areas = audit.classify_areas(["docs/pr-blocker-audit.md"], "Update docs") + + assert "Docs / tooling / tests" in areas + + +def test_script_tooling_only_pr_is_docs_tooling(): + audit = load_module() + + areas = audit.classify_areas(["scripts/pr_blocker_audit.py"], "Tooling script update") + + assert "Docs / tooling / tests" in areas + + +def test_readme_only_pr_is_docs_tooling(): + audit = load_module() + + areas = audit.classify_areas(["README.md"], "README update") + + assert "Docs / tooling / tests" in areas + + +def test_memory_owner_scope_leak_is_not_classified_as_auth(): + audit = load_module() + + areas = audit.classify_areas( + ["routes/memory_routes.py", "services/memory/store.py"], + "fix: memory route leaks another user's session", + ) + + assert "Memory / RAG / vector store" in areas + assert "Auth / users / API tokens" not in areas + + +def test_bearer_token_auth_path_is_classified_as_auth(): + audit = load_module() + + areas = audit.classify_areas( + ["core/auth.py", "routes/auth_routes.py"], + "fix: deleted users keep API access through bearer tokens", + ) + + assert "Auth / users / API tokens" in areas + + +def test_generic_security_file_is_not_classified_as_auth(): + audit = load_module() + + areas = audit.classify_areas( + ["tests/test_email_linkify_security_js.py"], + "Harden email HTML URL sanitization", + ) + + assert "Email / CalDAV" in areas + assert "Auth / users / API tokens" not in areas + + +def test_hot_file_overlap_detection(): + audit = load_module() + prs = audit.normalize_prs( + [ + {"number": 1, "title": "A", "files": ["core/search.py"]}, + {"number": 2, "title": "B", "files": ["core/search.py", "tests/test_search.py"]}, + {"number": 3, "title": "C", "files": ["core/other.py"]}, + ] + ) + + assert audit.hot_files(prs) == [("core/search.py", [1, 2])] + + +def test_possible_duplicate_grouping(): + audit = load_module() + prs = audit.normalize_prs( + [ + {"number": 1, "title": "Fix auth token refresh", "files": ["core/auth.py", "tests/test_auth.py"]}, + {"number": 2, "title": "Repair auth token refresh", "files": ["core/auth.py", "tests/test_auth.py"]}, + {"number": 3, "title": "Improve gallery preview", "files": ["core/gallery.py"]}, + ] + ) + + groups = audit.duplicate_candidates(prs) + + assert [[pr.number for pr in group] for group in groups] == [[1, 2]] + + +def test_score_ranking_is_deterministic(): + audit = load_module() + prs = audit.normalize_prs( + [ + { + "number": 2, + "title": "Gallery polish", + "reviewDecision": "APPROVED", + "updatedAt": "2026-05-20T00:00:00Z", + "files": ["core/gallery.py"], + }, + { + "number": 1, + "title": "Fix auth token owner permission", + "mergeStateStatus": "DIRTY", + "reviewDecision": "REVIEW_REQUIRED", + "updatedAt": "2026-06-01T00:00:00Z", + "files": ["core/auth.py", "tests/test_auth.py"], + }, + ] + ) + + scored = audit.score_prs(prs, now=datetime(2026, 6, 3, tzinfo=timezone.utc)) + + assert [item.pr.number for item in scored] == [1, 2] + assert scored[0].score > scored[1].score + + +def test_direct_bearer_token_issue_ranks_above_dirty_memory_leak(): + audit = load_module() + prs = audit.normalize_prs( + [ + { + "number": 1, + "title": "fix: deleted users keep API access through bearer tokens", + "mergeStateStatus": "CLEAN", + "files": ["core/auth.py", "routes/auth_routes.py"], + }, + { + "number": 2, + "title": "fix: memory route leaks another user's session", + "mergeStateStatus": "DIRTY", + "files": ["routes/memory_routes.py", "services/memory/store.py"], + }, + ] + ) + + scored = audit.score_prs(prs, now=datetime(2026, 6, 3, tzinfo=timezone.utc)) + + assert [item.pr.number for item in scored] == [1, 2] + assert scored[0].score > scored[1].score + + +def test_dirty_state_is_caution_text_not_priority_boost(): + audit = load_module() + dirty_memory = audit.normalize_prs( + [ + { + "number": 2, + "title": "fix: memory route leaks another user's session", + "mergeStateStatus": "DIRTY", + "files": ["routes/memory_routes.py", "services/memory/store.py"], + } + ] + )[0] + clean_auth = audit.normalize_prs( + [ + { + "number": 1, + "title": "fix: deleted users keep API access through bearer tokens", + "mergeStateStatus": "CLEAN", + "files": ["core/auth.py", "routes/auth_routes.py"], + } + ] + )[0] + + dirty_score = audit.score_pr(dirty_memory, audit.Counter(), datetime(2026, 6, 3, tzinfo=timezone.utc)) + clean_auth_score = audit.score_pr(clean_auth, audit.Counter(), datetime(2026, 6, 3, tzinfo=timezone.utc)) + + assert dirty_score.score < clean_auth_score.score + assert any("caution: merge state DIRTY" == reason for reason in dirty_score.reasons) + + +def test_markdown_contains_expected_sections_and_no_ansi(): + audit = load_module() + prs = audit.normalize_prs([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}]) + + report = audit.render_markdown(prs) + + assert "# PR Blocker Audit" in report + assert "## Executive summary" in report + assert "## Locked code areas" in report + assert "## Hot files" in report + assert "## Review / blocker priorities" in report + assert "## Duplicate candidates" in report + assert "## Safer areas for new work" in report + assert not re.search(r"\x1b\[[0-9;]*m", report) + + +def test_report_includes_missing_file_metadata_count(): + audit = load_module() + prs = audit.normalize_prs( + [ + {"number": 1, "title": "Fix search", "files": ["core/search.py"]}, + {"number": 2, "title": "No files"}, + ] + ) + + markdown = audit.render_markdown(prs) + terminal = audit.render_terminal(prs, use_color=False) + + assert "- PRs missing changed-file metadata: 1" in markdown + assert "PRs missing changed-file metadata: 1" in terminal + + +def test_overlap_summary_uses_hot_files_not_huge_clusters(): + audit = load_module() + prs = audit.normalize_prs( + [{"number": number, "title": f"PR {number}", "files": ["common.py"]} for number in range(1, 25)] + ) + + report = audit.render_terminal(prs, use_color=False) + + assert "Main overlap drivers: common.py (24 PRs)" in report + assert "Largest overlap clusters" not in report + assert "24 PRs (#1, #2" not in report + + +def test_long_pr_number_lists_are_truncated(): + audit = load_module() + + assert audit._format_pr_numbers(range(1, 16), limit=4) == "#1, #2, #3, #4, ... (+11 more)" + + +def test_other_locked_area_sorts_after_classified_critical_area(): + audit = load_module() + payload = [ + {"number": 1, "title": "Fix auth token", "files": ["core/auth.py"]}, + {"number": 2, "title": "Fix auth login", "files": ["routes/auth.py"]}, + {"number": 3, "title": "Fix auth permission", "files": ["tests/test_auth.py"]}, + {"number": 4, "title": "Fix auth security", "files": ["docs/auth.md"]}, + ] + payload.extend({"number": number, "title": f"Unclassified {number}"} for number in range(5, 25)) + prs = audit.normalize_prs(payload) + + locked = audit.locked_areas(prs, audit.score_prs(prs)) + + assert locked[0]["area"] == "Auth / users / API tokens" + assert locked[-1]["area"] == "Other / unclassified" + assert locked[-1]["why"] == "20 PRs, mostly missing changed-file metadata" + + +def test_terminal_render_color_modes(): + audit = load_module() + prs = audit.normalize_prs( + [ + {"number": 1, "title": "Fix search", "mergeStateStatus": "CLEAN", "files": ["core/search.py"]}, + {"number": 2, "title": "Search follow-up", "mergeStateStatus": "DIRTY", "files": ["core/search.py"]}, + ] + ) + + colored = audit.render_terminal(prs, use_color=True) + plain = audit.render_terminal(prs, use_color=False) + + assert "Hot files" in plain + assert "core/search.py" in plain + assert "Review / blocker priorities" in plain + assert "Heuristic score only; inspect these first, do not merge without validation." in plain + assert re.search(r"\x1b\[[0-9;]*m", colored) + assert not re.search(r"\x1b\[[0-9;]*m", plain) + + +def test_terminal_hot_files_respects_top(): + audit = load_module() + prs = audit.normalize_prs( + [ + {"number": 1, "title": "A", "files": ["a.py", "b.py"]}, + {"number": 2, "title": "B", "files": ["a.py", "b.py"]}, + {"number": 3, "title": "C", "files": ["b.py"]}, + ] + ) + + report = audit.render_terminal(prs, top=1, use_color=False) + + assert "Hot files" in report + assert "- b.py" in report + assert "- a.py" not in report + + +def test_terminal_truncates_long_title_but_markdown_keeps_it(): + audit = load_module() + long_title = "Fix search " + "very-long-detail " * 12 + prs = audit.normalize_prs([{"number": 1, "title": long_title, "files": ["core/search.py"]}]) + + terminal = audit.render_terminal(prs, use_color=False) + markdown = audit.render_markdown(prs) + short_title = audit.shorten_text(long_title) + + assert short_title in terminal + assert long_title not in terminal + assert long_title in markdown + + +def test_cli_terminal_color_always_outputs_ansi(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}]), encoding="utf-8") + + exit_code = audit.main(["--format", "terminal", "--color", "always", "--input", str(path)]) + output = capsys.readouterr().out + + assert exit_code == 0 + assert re.search(r"\x1b\[[0-9;]*m", output) + + +def test_cli_terminal_no_color_outputs_no_ansi(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}]), encoding="utf-8") + + exit_code = audit.main(["--format", "terminal", "--no-color", "--input", str(path)]) + output = capsys.readouterr().out + + assert exit_code == 0 + assert not re.search(r"\x1b\[[0-9;]*m", output) + + +def test_color_auto_requires_terminal_and_support(monkeypatch): + audit = load_module() + args = audit.argparse.Namespace(format="terminal", color="auto", output=None) + + monkeypatch.setattr(audit.sys.stdout, "isatty", lambda: True) + monkeypatch.delenv("NO_COLOR", raising=False) + monkeypatch.setitem(audit.os.environ, "TERM", "xterm-256color") + assert audit.should_use_color(args) + + monkeypatch.setitem(audit.os.environ, "NO_COLOR", "1") + assert not audit.should_use_color(args) + + +def test_color_output_file_and_markdown_disable_ansi(monkeypatch): + audit = load_module() + monkeypatch.setattr(audit.sys.stdout, "isatty", lambda: True) + monkeypatch.setitem(audit.os.environ, "TERM", "xterm-256color") + + output_args = audit.argparse.Namespace(format="terminal", color="auto", output="report.txt") + markdown_args = audit.argparse.Namespace(format="markdown", color="always", output=None) + + assert not audit.should_use_color(output_args) + assert not audit.should_use_color(markdown_args) + + +def test_invalid_json_handled_cleanly(tmp_path): + audit = load_module() + path = tmp_path / "bad.json" + path.write_text("{bad json", encoding="utf-8") + + exit_code = audit.main(["--input", str(path)]) + + assert exit_code == 1 + + +def test_empty_input_handled_cleanly(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps([]), encoding="utf-8") + + exit_code = audit.main(["--input", str(path)]) + output = capsys.readouterr().out + + assert exit_code == 0 + assert "Total PRs analyzed: 0" in output + assert "No PRs to rank." in output + + +# --- JSON format tests --- + +JSON_PRS = [ + { + "number": 1, + "title": "Fix auth token rotation", + "author": {"login": "alice"}, + "url": "https://example.test/pr/1", + "mergeStateStatus": "CLEAN", + "reviewDecision": "REVIEW_REQUIRED", + "updatedAt": "2026-05-30T12:00:00Z", + "files": [{"path": "core/auth.py"}, {"path": "tests/test_auth.py"}], + }, + { + "number": 2, + "title": "Fix auth login flow", + "author": {"login": "bob"}, + "url": "https://example.test/pr/2", + "mergeStateStatus": "DIRTY", + "reviewDecision": "CHANGES_REQUESTED", + "updatedAt": "2026-05-28T10:00:00Z", + "files": [{"path": "core/auth.py"}, {"path": "routes/auth_routes.py"}], + }, +] + + +def test_json_output_parses_with_json_loads(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + parsed = json.loads(output) + assert isinstance(parsed, dict) + + +def test_json_output_includes_expected_top_level_keys(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + parsed = json.loads(output) + assert set(parsed.keys()) == { + "summary", + "locked_areas", + "hot_files", + "review_priorities", + "duplicate_candidates", + "safer_areas", + } + + +def test_json_summary_fields(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + summary = json.loads(output)["summary"] + assert summary["total_prs_analyzed"] == 2 + assert "unique_files_touched" in summary + assert "prs_missing_changed_file_metadata" in summary + assert "main_overlap_drivers" in summary + assert "highest_risk_areas" in summary + assert "recommended_first_review_target" in summary + + +def test_json_review_priorities_structure(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + priorities = json.loads(output)["review_priorities"] + assert len(priorities) >= 1 + first = priorities[0] + assert set(first.keys()) >= {"rank", "number", "score", "title", "url", "merge_state", "review_decision", "reasons"} + assert first["rank"] == 1 + assert isinstance(first["reasons"], list) + + +def test_json_hot_files_structure(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + hot = json.loads(output)["hot_files"] + assert len(hot) >= 1 + assert hot[0]["file"] == "core/auth.py" + assert hot[0]["pr_count"] == 2 + assert set(hot[0]["pr_numbers"]) == {1, 2} + + +def test_json_output_file_excludes_progress_and_ansi_in_live_output_file(monkeypatch, tmp_path, capsys): + audit = load_module() + output_path = tmp_path / "report.json" + + def fake_run(cmd): + if cmd[:3] == ["gh", "pr", "list"]: + return JSON_PRS + return [] + + monkeypatch.setattr(audit, "_run_gh_json", fake_run) + + exit_code = audit.main( + ["--repo", "owner/repo", "--format", "json", "--output", str(output_path), "--progress", "always"] + ) + captured = capsys.readouterr() + report = output_path.read_text(encoding="utf-8") + + assert exit_code == 0 + assert captured.out == "" + assert "Fetching open PR list..." in captured.err or "Fetching changed files" in captured.err + parsed = json.loads(report) + assert set(parsed.keys()) == { + "summary", + "locked_areas", + "hot_files", + "review_priorities", + "duplicate_candidates", + "safer_areas", + } + assert not re.search(r"\x1b\[[0-9;]*m", report) + assert "Fetching" not in report + + +def test_json_format_with_color_always_emits_no_ansi(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json", "--color", "always"]) + output = capsys.readouterr().out + + assert exit_code == 0 + assert not re.search(r"\x1b\[[0-9;]*m", output) + parsed = json.loads(output) + assert isinstance(parsed, dict) + + +def test_json_output_is_deterministic(tmp_path): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps(JSON_PRS), encoding="utf-8") + + prs = audit.normalize_prs(JSON_PRS) + first = audit.render_json(prs) + second = audit.render_json(prs) + + assert first == second + parsed = json.loads(first) + assert isinstance(parsed, dict) + + +def test_json_empty_input_handled_cleanly(tmp_path, capsys): + audit = load_module() + path = tmp_path / "prs.json" + path.write_text(json.dumps([]), encoding="utf-8") + + exit_code = audit.main(["--input", str(path), "--format", "json"]) + output = capsys.readouterr().out + + assert exit_code == 0 + parsed = json.loads(output) + assert parsed["summary"]["total_prs_analyzed"] == 0 + assert parsed["hot_files"] == [] + assert parsed["review_priorities"] == [] + + +def test_help_includes_json_format_choice(): + audit = load_module() + + help_text = audit.build_parser().format_help() + + assert "markdown" in help_text + assert "terminal" in help_text + assert "json" in help_text