Adds a standalone read-only PR blocker audit helper with Markdown, terminal, and JSON output plus focused tests and documentation.
1052 lines
38 KiB
Python
1052 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""Read-only pull request overlap audit helper.
|
|
|
|
This script intentionally does not import the Odysseus application package.
|
|
It only reads local JSON input or invokes read-only `gh` list/API commands.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
AREA_RULES = [
|
|
(
|
|
"Auth / users / API tokens",
|
|
("auth", "token", "api_key", "api-key", "apikey", "login", "totp"),
|
|
("auth", "bearer token", "api token", "api key", "login", "privilege", "permission"),
|
|
),
|
|
(
|
|
"Memory / RAG / vector store",
|
|
("memory", "rag", "vector", "embedding", "faiss", "chroma"),
|
|
("memory", "rag", "vector", "embedding", "retrieval"),
|
|
),
|
|
("Search / web search", ("search", "ddg", "web_search"), ("search", "ddg", "web")),
|
|
(
|
|
"Model routing / endpoint discovery",
|
|
("model", "llm", "endpoint", "lmstudio", "ollama"),
|
|
("model", "routing", "endpoint", "discovery", "llm"),
|
|
),
|
|
(
|
|
"Agent loop / tools",
|
|
("agent", "tool", "function_call", "mcp", "shell"),
|
|
("agent", "tool", "function", "mcp"),
|
|
),
|
|
("Cookbook / runners", ("cookbook", "runner", "preset"), ("cookbook", "runner", "preset")),
|
|
("Email / CalDAV", ("mail", "email", "imap", "caldav", "calendar"), ("email", "mail", "caldav", "calendar")),
|
|
(
|
|
"Documents / uploads",
|
|
("document", "upload", "attachment", "processor", "markitdown"),
|
|
("document", "upload", "attachment"),
|
|
),
|
|
("Gallery / visual report", ("gallery", "image", "vision", "preview"), ("gallery", "visual", "image")),
|
|
(
|
|
"CI / repo process",
|
|
(".github", "docker", "compose", "workflow", "ci", "pytest"),
|
|
("ci", "workflow", "docker", "compose"),
|
|
),
|
|
(
|
|
"Docs / tooling / tests",
|
|
("docs/", "scripts/", "tests/", "README", "tooling"),
|
|
("docs", "test", "tooling", "script"),
|
|
),
|
|
]
|
|
|
|
ALL_AREAS = [rule[0] for rule in AREA_RULES] + ["Other"]
|
|
WORD_RE = re.compile(r"[a-z0-9]+")
|
|
ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
|
|
ANSI = {
|
|
"bold": "\033[1m",
|
|
"bold_red": "\033[1;31m",
|
|
"bold_cyan": "\033[1;36m",
|
|
"red": "\033[31m",
|
|
"yellow": "\033[33m",
|
|
"green": "\033[32m",
|
|
"cyan": "\033[36m",
|
|
"blue": "\033[34m",
|
|
"dim": "\033[2m",
|
|
"reset": "\033[0m",
|
|
}
|
|
STOP_WORDS = {
|
|
"a",
|
|
"add",
|
|
"and",
|
|
"bug",
|
|
"fix",
|
|
"for",
|
|
"in",
|
|
"new",
|
|
"of",
|
|
"pr",
|
|
"the",
|
|
"to",
|
|
"update",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PullRequest:
|
|
number: int
|
|
title: str
|
|
author: str
|
|
url: str
|
|
files: tuple[str, ...]
|
|
merge_state: str
|
|
review_decision: str
|
|
updated_at: str
|
|
areas: tuple[str, ...]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScoredPullRequest:
|
|
pr: PullRequest
|
|
score: int
|
|
reasons: tuple[str, ...]
|
|
|
|
|
|
class ProgressReporter:
|
|
def __init__(self, enabled: bool, stream=None):
|
|
self.enabled = enabled
|
|
self.stream = stream or sys.stderr
|
|
self.last_len = 0
|
|
|
|
def phase(self, message: str) -> None:
|
|
if self.enabled:
|
|
self.stream.write(f"{message}\n")
|
|
self.stream.flush()
|
|
|
|
def update(self, done: int, total: int, files_count: int, missing_count: int, number: int) -> None:
|
|
if not self.enabled:
|
|
return
|
|
percent = int(done * 100 / total) if total else 100
|
|
line = (
|
|
f"Fetching changed files: {done}/{total} PRs ({percent}%) | "
|
|
f"files {files_count} | missing {missing_count} | #{number}"
|
|
)
|
|
line = line[:140]
|
|
padding = max(self.last_len - len(line), 0)
|
|
self.stream.write(f"\r{line}{' ' * padding}")
|
|
self.stream.flush()
|
|
self.last_len = len(line)
|
|
|
|
def finish_line(self) -> None:
|
|
if self.enabled and self.last_len:
|
|
self.stream.write(f"\r{' ' * self.last_len}\r")
|
|
self.stream.flush()
|
|
self.last_len = 0
|
|
|
|
def summary(self, message: str) -> None:
|
|
if self.enabled:
|
|
self.finish_line()
|
|
self.stream.write(f"{message}\n")
|
|
self.stream.flush()
|
|
|
|
|
|
def load_json_file(path: Path):
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"invalid JSON in {path}: {exc.msg} at line {exc.lineno}, column {exc.colno}") from exc
|
|
except OSError as exc:
|
|
raise ValueError(f"could not read {path}: {exc}") from exc
|
|
|
|
|
|
def fetch_live_prs(repo: str, fetch_files: bool = True, progress: ProgressReporter | None = None, limit: int = 1000):
|
|
progress = progress or ProgressReporter(False)
|
|
fields = (
|
|
"number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url"
|
|
if fetch_files
|
|
else "number,title,author,mergeStateStatus,reviewDecision,updatedAt,url"
|
|
)
|
|
cmd = ["gh", "pr", "list", "--repo", repo, "--state", "open", "--limit", str(limit), "--json", fields]
|
|
progress.phase("Fetching open PR list...")
|
|
try:
|
|
payload = _run_gh_json(cmd)
|
|
except RuntimeError:
|
|
api_path = f"repos/{repo}/pulls?state=open&per_page=100"
|
|
payload = _run_gh_json(["gh", "api", "--paginate", api_path])
|
|
payload = _limit_payload(payload, limit)
|
|
if not fetch_files:
|
|
return payload
|
|
return _fill_missing_live_files(repo, payload, progress)
|
|
|
|
|
|
def _limit_payload(payload, limit: int):
|
|
if isinstance(payload, dict):
|
|
raw_prs = payload.get("items", [])
|
|
if isinstance(raw_prs, list):
|
|
return {**payload, "items": raw_prs[:limit]}
|
|
return payload
|
|
if isinstance(payload, list):
|
|
return payload[:limit]
|
|
return payload
|
|
|
|
|
|
def _fill_missing_live_files(repo: str, payload, progress: ProgressReporter | None = None):
|
|
progress = progress or ProgressReporter(False)
|
|
raw_prs = payload.get("items", []) if isinstance(payload, dict) else payload
|
|
if not isinstance(raw_prs, list):
|
|
return payload
|
|
|
|
warnings = []
|
|
targets = [item for item in raw_prs if isinstance(item, dict)]
|
|
progress.phase(f"Fetching changed files for {len(targets)} PRs...")
|
|
fetched_count = 0
|
|
files_count = 0
|
|
missing_count = 0
|
|
for done, item in enumerate(targets, start=1):
|
|
number = _safe_int(item.get("number"))
|
|
current_files = _extract_files(item.get("files", []))
|
|
if not number:
|
|
warnings.append("PR with missing number has no changed-file metadata")
|
|
missing_count += 1
|
|
progress.update(done, len(targets), files_count, missing_count, number)
|
|
continue
|
|
if current_files:
|
|
fetched_count += 1
|
|
files_count += len(current_files)
|
|
progress.update(done, len(targets), files_count, missing_count, number)
|
|
continue
|
|
try:
|
|
files = _fetch_live_pr_files(repo, number)
|
|
except RuntimeError as exc:
|
|
warnings.append(f"PR #{number}: could not fetch changed files: {exc}")
|
|
missing_count += 1
|
|
progress.update(done, len(targets), files_count, missing_count, number)
|
|
continue
|
|
item["files"] = [{"path": path} for path in files]
|
|
files_count += len(files)
|
|
if files:
|
|
fetched_count += 1
|
|
else:
|
|
missing_count += 1
|
|
progress.update(done, len(targets), files_count, missing_count, number)
|
|
|
|
progress.summary(f"Fetched changed files for {fetched_count}/{len(targets)} PRs; {missing_count} missing metadata.")
|
|
|
|
if isinstance(payload, dict):
|
|
if warnings:
|
|
payload["warnings"] = [*payload.get("warnings", []), *warnings]
|
|
return payload
|
|
if warnings:
|
|
return {"items": payload, "warnings": warnings}
|
|
return payload
|
|
|
|
|
|
def _fetch_live_pr_files(repo: str, number: int) -> list[str]:
|
|
api_path = f"repos/{repo}/pulls/{number}/files?per_page=100"
|
|
payload = _run_gh_json(["gh", "api", "--paginate", api_path])
|
|
return _extract_files(payload)
|
|
|
|
|
|
def _run_gh_json(cmd: list[str]):
|
|
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(result.stderr.strip() or f"{cmd[0]} exited with {result.returncode}")
|
|
try:
|
|
return json.loads(result.stdout or "[]")
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(f"gh returned invalid JSON: {exc}") from exc
|
|
|
|
|
|
def normalize_prs(payload) -> list[PullRequest]:
|
|
raw_prs = payload.get("items", []) if isinstance(payload, dict) else payload
|
|
if raw_prs is None:
|
|
raw_prs = []
|
|
if not isinstance(raw_prs, list):
|
|
raise ValueError("expected input JSON to be a list of pull requests or an object with an items list")
|
|
return [normalize_pr(item) for item in raw_prs if isinstance(item, dict)]
|
|
|
|
|
|
def missing_file_metadata_count(prs: list[PullRequest]) -> int:
|
|
return sum(1 for pr in prs if not pr.files)
|
|
|
|
|
|
def missing_metadata_warning(count: int) -> str:
|
|
noun = "PR" if count == 1 else "PRs"
|
|
return f"Warning: {count} {noun} still missing changed-file metadata."
|
|
|
|
|
|
def normalize_pr(item: dict) -> PullRequest:
|
|
files = tuple(sorted(set(_extract_files(item.get("files", [])))))
|
|
title = str(item.get("title") or "")
|
|
areas = tuple(sorted(classify_areas(files, title)))
|
|
return PullRequest(
|
|
number=_safe_int(item.get("number")),
|
|
title=title,
|
|
author=_extract_author(item),
|
|
url=str(item.get("url") or item.get("html_url") or ""),
|
|
files=files,
|
|
merge_state=str(item.get("mergeStateStatus") or item.get("merge_state_status") or item.get("mergeable_state") or "unknown"),
|
|
review_decision=str(item.get("reviewDecision") or item.get("review_decision") or "unknown"),
|
|
updated_at=str(item.get("updatedAt") or item.get("updated_at") or ""),
|
|
areas=areas,
|
|
)
|
|
|
|
|
|
def _extract_files(files) -> list[str]:
|
|
if not isinstance(files, list):
|
|
return []
|
|
paths = []
|
|
for entry in files:
|
|
if isinstance(entry, str):
|
|
paths.append(entry)
|
|
elif isinstance(entry, dict):
|
|
path = entry.get("path") or entry.get("filename") or entry.get("name")
|
|
if path:
|
|
paths.append(str(path))
|
|
return paths
|
|
|
|
|
|
def _extract_author(item: dict) -> str:
|
|
author = item.get("author") or item.get("user") or {}
|
|
if isinstance(author, dict):
|
|
return str(author.get("login") or "unknown")
|
|
return str(author or "unknown")
|
|
|
|
|
|
def _safe_int(value) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def classify_areas(files: Iterable[str], title: str = "") -> set[str]:
|
|
file_list = tuple(files)
|
|
file_text = " ".join(file_list).lower()
|
|
title_text = title.lower()
|
|
areas = set()
|
|
for area, path_keywords, title_keywords in AREA_RULES:
|
|
if area == "Docs / tooling / tests":
|
|
if is_docs_tooling_only(file_list) or title_strongly_indicates_docs_tooling(title_text):
|
|
areas.add(area)
|
|
continue
|
|
if any(keyword.lower() in file_text for keyword in path_keywords):
|
|
areas.add(area)
|
|
continue
|
|
if any(title_has_keyword(title_text, keyword) for keyword in title_keywords):
|
|
areas.add(area)
|
|
return areas or {"Other"}
|
|
|
|
|
|
def is_docs_tooling_only(files: Iterable[str]) -> bool:
|
|
file_list = [path.lower() for path in files]
|
|
return bool(file_list) and all(is_docs_tooling_path(path) for path in file_list)
|
|
|
|
|
|
def is_docs_tooling_path(path: str) -> bool:
|
|
name = path.rsplit("/", 1)[-1]
|
|
return (
|
|
path.startswith("docs/")
|
|
or path.startswith("scripts/")
|
|
or path.startswith("tests/")
|
|
or path.startswith(".github/")
|
|
or "tooling" in path
|
|
or name.startswith("readme")
|
|
or name in {"pytest.ini", "tox.ini", "mypy.ini", "ruff.toml"}
|
|
)
|
|
|
|
|
|
def title_strongly_indicates_docs_tooling(title: str) -> bool:
|
|
words_set = set(words(title))
|
|
phrases = (
|
|
"docs only",
|
|
"documentation only",
|
|
"test only",
|
|
"tests only",
|
|
"tooling only",
|
|
"script only",
|
|
"scripts only",
|
|
)
|
|
return any(phrase in title for phrase in phrases) or bool(
|
|
words_set & {"docs", "documentation", "readme", "tests", "tooling", "scripts"}
|
|
) and not bool(words_set & {"api", "auth", "route", "runtime", "server", "ui", "memory", "model", "email"})
|
|
|
|
|
|
def title_has_keyword(title: str, keyword: str) -> bool:
|
|
keyword = keyword.lower()
|
|
if " " in keyword:
|
|
return keyword in title
|
|
return keyword in set(words(title))
|
|
|
|
|
|
def hot_files(prs: list[PullRequest]) -> list[tuple[str, list[int]]]:
|
|
owners: dict[str, list[int]] = defaultdict(list)
|
|
for pr in prs:
|
|
for path in pr.files:
|
|
owners[path].append(pr.number)
|
|
rows = [(path, sorted(numbers)) for path, numbers in owners.items() if len(numbers) > 1]
|
|
return sorted(rows, key=lambda row: (-len(row[1]), row[0]))
|
|
|
|
|
|
def overlap_clusters(prs: list[PullRequest]) -> list[list[PullRequest]]:
|
|
by_file: dict[str, list[int]] = defaultdict(list)
|
|
by_number = {pr.number: pr for pr in prs}
|
|
for pr in prs:
|
|
for path in pr.files:
|
|
by_file[path].append(pr.number)
|
|
|
|
edges: dict[int, set[int]] = defaultdict(set)
|
|
for numbers in by_file.values():
|
|
if len(numbers) < 2:
|
|
continue
|
|
for number in numbers:
|
|
edges[number].update(n for n in numbers if n != number)
|
|
|
|
seen = set()
|
|
clusters = []
|
|
for number in sorted(edges):
|
|
if number in seen:
|
|
continue
|
|
stack = [number]
|
|
cluster_numbers = set()
|
|
while stack:
|
|
current = stack.pop()
|
|
if current in cluster_numbers:
|
|
continue
|
|
cluster_numbers.add(current)
|
|
stack.extend(edges[current] - cluster_numbers)
|
|
seen.update(cluster_numbers)
|
|
clusters.append([by_number[n] for n in sorted(cluster_numbers) if n in by_number])
|
|
return sorted(clusters, key=lambda cluster: (-len(cluster), [pr.number for pr in cluster]))
|
|
|
|
|
|
def score_prs(prs: list[PullRequest], now: datetime | None = None) -> list[ScoredPullRequest]:
|
|
now = now or reference_time(prs)
|
|
file_counts = Counter(path for pr in prs for path in pr.files)
|
|
scored = [score_pr(pr, file_counts, now) for pr in prs]
|
|
return sorted(scored, key=lambda item: (-item.score, item.pr.number))
|
|
|
|
|
|
def score_pr(pr: PullRequest, file_counts: Counter, now: datetime) -> ScoredPullRequest:
|
|
score = 0
|
|
reasons = []
|
|
text = f"{pr.title} {' '.join(pr.files)}".lower()
|
|
|
|
# Heuristic, not a truth model: weights favor direct auth/token
|
|
# lifecycle fixes first, then confidentiality/persistence/memory risk,
|
|
# overlap pressure, review state, and actionability. Merge conflicts are
|
|
# caution signals only; they do not prove importance.
|
|
if direct_auth_token_signal(pr):
|
|
score += 45
|
|
reasons.append("direct auth/token lifecycle signal")
|
|
elif any(word in text for word in ("security", "secret", "privilege", "permission")):
|
|
score += 22
|
|
reasons.append("security keyword")
|
|
|
|
if any(word in text for word in ("leak", "leaks", "exposure", "cross-user", "cross user", "privacy")):
|
|
score += 18
|
|
reasons.append("data exposure keyword")
|
|
if any(word in text for word in ("data-loss", "persistence", "migration", "database", "sqlite", "postgres")):
|
|
score += 20
|
|
reasons.append("persistence/migration keyword")
|
|
if any(word in text for word in ("memory", "vector", "rag", "embedding", "retrieval")):
|
|
score += 15
|
|
reasons.append("memory/RAG keyword")
|
|
|
|
overlap_count = sum(1 for path in pr.files if file_counts[path] > 1)
|
|
if overlap_count:
|
|
points = min(overlap_count * 3, 30)
|
|
score += points
|
|
reasons.append(f"{overlap_count} overlapping file(s)")
|
|
|
|
merge_state = pr.merge_state.lower()
|
|
if merge_state in {"clean", "has_hooks"}:
|
|
score += 3
|
|
reasons.append("clean/actionable merge state")
|
|
elif merge_state in {"dirty", "blocked", "conflicting", "unstable"}:
|
|
reasons.append(f"caution: merge state {pr.merge_state}")
|
|
elif merge_state in {"unknown", ""}:
|
|
reasons.append("caution: merge state unknown")
|
|
|
|
review_decision = pr.review_decision.lower()
|
|
if review_decision == "approved":
|
|
score -= 8
|
|
reasons.append("already approved")
|
|
elif review_decision == "changes_requested":
|
|
score += 10
|
|
reasons.append("changes requested")
|
|
elif review_decision == "review_required":
|
|
score += 6
|
|
reasons.append("review required")
|
|
elif review_decision in {"unknown", "", "none"}:
|
|
score += 4
|
|
reasons.append("review state unknown")
|
|
|
|
age_days = days_since(pr.updated_at, now)
|
|
if age_days is not None and age_days <= 7:
|
|
score += 8
|
|
reasons.append("updated in last 7 days")
|
|
elif age_days is not None and age_days <= 30:
|
|
score += 4
|
|
reasons.append("updated in last 30 days")
|
|
|
|
return ScoredPullRequest(pr=pr, score=score, reasons=tuple(reasons or ["low overlap / low signal"]))
|
|
|
|
|
|
def direct_auth_token_signal(pr: PullRequest) -> bool:
|
|
file_text = " ".join(pr.files).lower()
|
|
title = pr.title.lower()
|
|
path_hit = any(
|
|
keyword in file_text
|
|
for keyword in ("auth", "token", "api_key", "api-key", "apikey", "key_manager", "security")
|
|
)
|
|
title_hit = any(
|
|
phrase in title
|
|
for phrase in ("bearer token", "api token", "api key", "auth", "login", "privilege", "permission")
|
|
)
|
|
lifecycle_hit = any(word in title for word in ("deleted", "revoked", "expired", "disabled", "removed"))
|
|
return path_hit and (title_hit or lifecycle_hit)
|
|
|
|
|
|
def days_since(value: str, now: datetime) -> int | None:
|
|
parsed = parse_datetime(value)
|
|
if parsed is None:
|
|
return None
|
|
return max((now - parsed).days, 0)
|
|
|
|
|
|
def reference_time(prs: list[PullRequest]) -> datetime:
|
|
parsed = [value for value in (parse_datetime(pr.updated_at) for pr in prs) if value is not None]
|
|
if parsed:
|
|
return max(parsed)
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def parse_datetime(value: str) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
try:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
return parsed
|
|
|
|
|
|
def duplicate_candidates(prs: list[PullRequest]) -> list[list[PullRequest]]:
|
|
matches: dict[int, set[int]] = defaultdict(set)
|
|
by_number = {pr.number: pr for pr in prs}
|
|
for index, left in enumerate(prs):
|
|
for right in prs[index + 1 :]:
|
|
if _looks_similar(left, right):
|
|
matches[left.number].add(right.number)
|
|
matches[right.number].add(left.number)
|
|
return _groups_from_matches(matches, by_number)
|
|
|
|
|
|
def _looks_similar(left: PullRequest, right: PullRequest) -> bool:
|
|
left_files = set(left.files)
|
|
right_files = set(right.files)
|
|
if not left_files or not right_files:
|
|
return False
|
|
file_similarity = len(left_files & right_files) / len(left_files | right_files)
|
|
shared_title = title_keywords(left.title) & title_keywords(right.title)
|
|
return file_similarity >= 0.5 and len(shared_title) >= 2
|
|
|
|
|
|
def _groups_from_matches(matches: dict[int, set[int]], by_number: dict[int, PullRequest]) -> list[list[PullRequest]]:
|
|
seen = set()
|
|
groups = []
|
|
for number in sorted(matches):
|
|
if number in seen:
|
|
continue
|
|
stack = [number]
|
|
group = set()
|
|
while stack:
|
|
current = stack.pop()
|
|
if current in group:
|
|
continue
|
|
group.add(current)
|
|
stack.extend(matches[current] - group)
|
|
seen.update(group)
|
|
groups.append([by_number[n] for n in sorted(group) if n in by_number])
|
|
return sorted(groups, key=lambda group: (-len(group), [pr.number for pr in group]))
|
|
|
|
|
|
def words(value: str) -> list[str]:
|
|
return WORD_RE.findall(value.lower())
|
|
|
|
|
|
def title_keywords(title: str) -> set[str]:
|
|
return {word for word in words(title) if len(word) > 2 and word not in STOP_WORDS}
|
|
|
|
|
|
def locked_areas(prs: list[PullRequest], scored: list[ScoredPullRequest]) -> list[dict[str, object]]:
|
|
score_by_number = {item.pr.number: item.score for item in scored}
|
|
rows = []
|
|
for area in ALL_AREAS:
|
|
area_prs = [pr for pr in prs if area in pr.areas]
|
|
if not area_prs:
|
|
continue
|
|
area_files = Counter(path for pr in area_prs for path in pr.files)
|
|
overlapping = [path for path, count in area_files.items() if count > 1]
|
|
max_score = max(score_by_number.get(pr.number, 0) for pr in area_prs)
|
|
missing_files = sum(1 for pr in area_prs if not pr.files)
|
|
priority = _locked_area_priority(area, area_prs, max_score)
|
|
why = _locked_area_why(area, missing_files, len(area_prs), bool(overlapping))
|
|
if missing_files and area != "Other":
|
|
why += "; some PRs have no file metadata"
|
|
rows.append(
|
|
{
|
|
"area": "Other / unclassified" if area == "Other" else area,
|
|
"files": _summarize_files(area_files),
|
|
"prs": [pr.number for pr in sorted(area_prs, key=lambda item: item.number)],
|
|
"why": why,
|
|
"priority": priority,
|
|
"is_other": area == "Other",
|
|
}
|
|
)
|
|
return sorted(rows, key=lambda row: (bool(row["is_other"]), _priority_rank(str(row["priority"])), -len(row["prs"]), str(row["area"])))
|
|
|
|
|
|
def _locked_area_priority(area: str, prs: list[PullRequest], max_score: int) -> str:
|
|
if area == "Other" and all(not pr.files for pr in prs):
|
|
return "watch"
|
|
return "critical" if len(prs) >= 4 or max_score >= 45 else "high" if len(prs) >= 2 or max_score >= 30 else "watch"
|
|
|
|
|
|
def _locked_area_why(area: str, missing_files: int, total_prs: int, has_overlap: bool) -> str:
|
|
if area == "Other" and missing_files > total_prs / 2:
|
|
return f"{total_prs} PRs, mostly missing changed-file metadata"
|
|
return "shared file overlap" if has_overlap else "active open PRs in area"
|
|
|
|
|
|
def _summarize_files(counts: Counter) -> str:
|
|
if not counts:
|
|
return "No changed-file metadata"
|
|
top = [path for path, _count in counts.most_common(5)]
|
|
return ", ".join(top)
|
|
|
|
|
|
def _priority_rank(priority: str) -> int:
|
|
return {"critical": 0, "high": 1, "watch": 2}.get(priority, 3)
|
|
|
|
|
|
def safer_areas(prs: list[PullRequest]) -> list[str]:
|
|
area_counts = Counter(area for pr in prs for area in pr.areas)
|
|
suggestions = []
|
|
for area in ALL_AREAS:
|
|
count = area_counts.get(area, 0)
|
|
if count == 0:
|
|
suggestions.append(f"{area}: no open PRs in this input matched the area mapping")
|
|
elif area == "Docs / tooling / tests" and count <= 2:
|
|
suggestions.append(f"{area}: low overlap; good candidate for docs, tests, or maintenance-only work")
|
|
if not suggestions:
|
|
suggestions.append("No clearly quiet area found; prefer narrow docs, tests, or tooling work after checking current PRs.")
|
|
return suggestions[:6]
|
|
|
|
|
|
def build_structured_report(prs: list[PullRequest], top: int = 15) -> dict:
|
|
top = max(top, 1)
|
|
scored = score_prs(prs)
|
|
hot = hot_files(prs)
|
|
locked = locked_areas(prs, scored)
|
|
duplicates = duplicate_candidates(prs)
|
|
unique_files = len({path for pr in prs for path in pr.files})
|
|
missing_files = missing_file_metadata_count(prs)
|
|
target = scored[0] if scored else None
|
|
|
|
return {
|
|
"summary": {
|
|
"highest_risk_areas": _risk_summary(locked),
|
|
"main_overlap_drivers": _overlap_driver_summary(hot),
|
|
"prs_missing_changed_file_metadata": missing_files,
|
|
"recommended_first_review_target": _target_summary(target),
|
|
"total_prs_analyzed": len(prs),
|
|
"unique_files_touched": unique_files,
|
|
},
|
|
"locked_areas": [
|
|
{
|
|
"area": row["area"],
|
|
"files": row["files"],
|
|
"priority": row["priority"],
|
|
"prs": row["prs"],
|
|
"why": row["why"],
|
|
}
|
|
for row in locked
|
|
],
|
|
"hot_files": [
|
|
{
|
|
"file": path,
|
|
"pr_count": len(numbers),
|
|
"pr_numbers": numbers,
|
|
}
|
|
for path, numbers in hot[:top]
|
|
],
|
|
"review_priorities": [
|
|
{
|
|
"merge_state": item.pr.merge_state,
|
|
"number": item.pr.number,
|
|
"rank": index,
|
|
"reasons": list(item.reasons),
|
|
"review_decision": item.pr.review_decision,
|
|
"score": item.score,
|
|
"title": item.pr.title or "untitled",
|
|
"url": item.pr.url,
|
|
}
|
|
for index, item in enumerate(scored[:top], start=1)
|
|
],
|
|
"duplicate_candidates": [
|
|
{
|
|
"pr_numbers": [pr.number for pr in group],
|
|
"titles": [pr.title or "untitled" for pr in group],
|
|
}
|
|
for group in duplicates
|
|
],
|
|
"safer_areas": safer_areas(prs),
|
|
}
|
|
|
|
|
|
def render_json(prs: list[PullRequest], top: int = 15) -> str:
|
|
return json.dumps(build_structured_report(prs, top), indent=2, sort_keys=True) + "\n"
|
|
|
|
|
|
def render_markdown(prs: list[PullRequest], top: int = 15) -> str:
|
|
top = max(top, 1)
|
|
scored = score_prs(prs)
|
|
hot = hot_files(prs)
|
|
locked = locked_areas(prs, scored)
|
|
duplicates = duplicate_candidates(prs)
|
|
unique_files = len({path for pr in prs for path in pr.files})
|
|
missing_files = missing_file_metadata_count(prs)
|
|
target = scored[0] if scored else None
|
|
|
|
lines = ["# PR Blocker Audit", "", "## Executive summary", ""]
|
|
lines.append(f"- Total PRs analyzed: {len(prs)}")
|
|
lines.append(f"- Unique files touched: {unique_files}")
|
|
lines.append(f"- PRs missing changed-file metadata: {missing_files}")
|
|
lines.append(f"- Main overlap drivers: {_overlap_driver_summary(hot)}")
|
|
lines.append(f"- Highest-risk areas: {_risk_summary(locked)}")
|
|
lines.append(f"- Recommended first review target: {_target_summary(target)}")
|
|
lines.extend(["", "## Locked code areas", ""])
|
|
lines.extend(_table(["area", "files/directories", "PRs", "why locked", "priority"], _locked_rows(locked)))
|
|
lines.extend(["", "## Hot files", ""])
|
|
lines.extend(_table(["file", "PR count", "PR numbers"], _hot_rows(hot, top)))
|
|
lines.extend(["", "## Review / blocker priorities", ""])
|
|
lines.append("Heuristic score only; inspect these earlier, do not merge without validation.")
|
|
lines.append("")
|
|
lines.extend(_review_rows(scored, top))
|
|
lines.extend(["", "## Duplicate candidates", ""])
|
|
lines.extend(_duplicate_rows(duplicates))
|
|
lines.extend(["", "## Safer areas for new work", ""])
|
|
lines.extend(f"- {item}" for item in safer_areas(prs))
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_terminal(prs: list[PullRequest], top: int = 15, use_color: bool = False) -> str:
|
|
top = max(top, 1)
|
|
scored = score_prs(prs)
|
|
hot = hot_files(prs)
|
|
locked = locked_areas(prs, scored)
|
|
duplicates = duplicate_candidates(prs)
|
|
unique_files = len({path for pr in prs for path in pr.files})
|
|
missing_files = missing_file_metadata_count(prs)
|
|
target = scored[0] if scored else None
|
|
|
|
lines = [colorize("PR Blocker Audit", "bold_cyan", use_color), ""]
|
|
lines.append(f"PRs analyzed: {len(prs)}")
|
|
lines.append(f"Unique files touched: {unique_files}")
|
|
lines.append(f"PRs missing changed-file metadata: {missing_files}")
|
|
lines.append(f"Main overlap drivers: {_overlap_driver_summary(hot)}")
|
|
lines.append(f"Recommended first review target: {_target_summary(target, truncate=True)}")
|
|
lines.extend(["", colorize("Locked areas", "bold_cyan", use_color)])
|
|
if locked:
|
|
for row in locked[:top]:
|
|
priority = str(row["priority"])
|
|
label = colorize(priority.upper(), priority_color(priority), use_color)
|
|
prs_text = _format_pr_numbers(row["prs"])
|
|
lines.append(f"- {label} {row['area']}: {prs_text} ({row['why']})")
|
|
lines.append(colorize(f" {row['files']}", "dim", use_color))
|
|
else:
|
|
lines.append("- none")
|
|
|
|
lines.extend(["", colorize("Hot files", "bold_cyan", use_color)])
|
|
lines.extend(_terminal_hot_rows(hot, top, use_color))
|
|
lines.extend(["", colorize("Review / blocker priorities", "bold_cyan", use_color)])
|
|
lines.append(colorize("Heuristic score only; inspect these first, do not merge without validation.", "dim", use_color))
|
|
if scored:
|
|
for item in scored[:top]:
|
|
pr = item.pr
|
|
state = colorize(pr.merge_state or "unknown", merge_state_color(pr.merge_state), use_color)
|
|
reasons = "; ".join(item.reasons[:3])
|
|
title = shorten_text(pr.title or "untitled")
|
|
lines.append(f"- {item.score:>3} #{pr.number:<5} {state:<18} {title}")
|
|
lines.append(colorize(f" {reasons}", "dim", use_color))
|
|
else:
|
|
lines.append("- none")
|
|
|
|
lines.extend(["", colorize("Possible duplicates", "bold_cyan", use_color)])
|
|
lines.extend(_terminal_duplicate_rows(duplicates))
|
|
lines.extend(["", colorize("Safer areas", "bold_cyan", use_color)])
|
|
lines.extend(f"- {item}" for item in safer_areas(prs))
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _terminal_hot_rows(hot: list[tuple[str, list[int]]], top: int, use_color: bool) -> list[str]:
|
|
if not hot:
|
|
return ["- none"]
|
|
rows = []
|
|
for path, numbers in hot[:top]:
|
|
count_label = f"{len(numbers)} PRs"
|
|
rows.append(f"- {path:<28} {colorize(count_label, hot_count_color(len(numbers)), use_color)} {_format_pr_numbers(numbers)}")
|
|
return rows
|
|
|
|
|
|
def _terminal_duplicate_rows(groups: list[list[PullRequest]]) -> list[str]:
|
|
if not groups:
|
|
return ["- none detected"]
|
|
rows = []
|
|
for group in groups:
|
|
numbers = _format_pr_numbers(pr.number for pr in group)
|
|
titles = "; ".join(shorten_text(pr.title or "untitled", 80) for pr in group)
|
|
rows.append(f"- Possible duplicate / needs human review: {numbers} - {titles}")
|
|
return rows
|
|
|
|
|
|
def colorize(text: object, style: str, use_color: bool) -> str:
|
|
value = str(text)
|
|
if not use_color:
|
|
return value
|
|
return f"{ANSI[style]}{value}{ANSI['reset']}"
|
|
|
|
|
|
def priority_color(priority: str) -> str:
|
|
return {"critical": "bold_red", "high": "yellow", "watch": "cyan"}.get(priority.lower(), "blue")
|
|
|
|
|
|
def hot_count_color(count: int) -> str:
|
|
return "bold_red" if count >= 4 else "yellow" if count >= 2 else "dim"
|
|
|
|
|
|
def merge_state_color(state: str) -> str:
|
|
normalized = (state or "unknown").lower()
|
|
if normalized == "clean":
|
|
return "green"
|
|
if normalized in {"dirty", "blocked", "conflicting", "unstable"}:
|
|
return "red"
|
|
return "yellow"
|
|
|
|
|
|
def should_use_color(args: argparse.Namespace) -> bool:
|
|
if args.format != "terminal":
|
|
return False
|
|
if args.color == "always":
|
|
if os.name == "nt":
|
|
enable_windows_vt_mode()
|
|
return True
|
|
if args.color == "never" or args.output:
|
|
return False
|
|
if not sys.stdout.isatty() or "NO_COLOR" in os.environ or os.environ.get("TERM") == "dumb":
|
|
return False
|
|
if os.name == "nt":
|
|
return enable_windows_vt_mode()
|
|
return bool(os.environ.get("TERM") or os.environ.get("COLORTERM"))
|
|
|
|
|
|
def should_show_progress(args: argparse.Namespace) -> bool:
|
|
if args.quiet or args.input or args.no_fetch_files:
|
|
return False
|
|
if args.progress == "always":
|
|
return True
|
|
if args.progress == "never":
|
|
return False
|
|
return sys.stderr.isatty()
|
|
|
|
|
|
def enable_windows_vt_mode() -> bool:
|
|
if os.name != "nt":
|
|
return True
|
|
try:
|
|
import ctypes
|
|
|
|
kernel32 = ctypes.windll.kernel32
|
|
handle = kernel32.GetStdHandle(-11)
|
|
mode = ctypes.c_uint32()
|
|
if not kernel32.GetConsoleMode(handle, ctypes.byref(mode)):
|
|
return False
|
|
return bool(kernel32.SetConsoleMode(handle, mode.value | 0x0004))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _cluster_summary(clusters: list[list[PullRequest]]) -> str:
|
|
if not clusters:
|
|
return "none detected"
|
|
summary = []
|
|
for cluster in clusters[:3]:
|
|
summary.append(f"{len(cluster)} PRs ({_format_pr_numbers(pr.number for pr in cluster)})")
|
|
return "; ".join(summary)
|
|
|
|
|
|
def _overlap_driver_summary(hot: list[tuple[str, list[int]]], limit: int = 3) -> str:
|
|
if not hot:
|
|
return "none detected"
|
|
return ", ".join(f"{path} ({len(numbers)} PRs)" for path, numbers in hot[:limit])
|
|
|
|
|
|
def _risk_summary(locked: list[dict[str, object]]) -> str:
|
|
if not locked:
|
|
return "none detected"
|
|
return ", ".join(f"{row['area']} ({row['priority']})" for row in locked[:3])
|
|
|
|
|
|
def _target_summary(target: ScoredPullRequest | None, truncate: bool = False) -> str:
|
|
if target is None:
|
|
return "none; no PRs in input"
|
|
title = target.pr.title or "untitled"
|
|
if truncate:
|
|
title = shorten_text(title)
|
|
return f"PR #{target.pr.number} ({target.score}) - {title}"
|
|
|
|
|
|
def _locked_rows(locked: list[dict[str, object]]) -> list[list[str]]:
|
|
if not locked:
|
|
return [["none", "none", "none", "none", "none"]]
|
|
return [
|
|
[
|
|
str(row["area"]),
|
|
str(row["files"]),
|
|
_format_pr_numbers(row["prs"]),
|
|
str(row["why"]),
|
|
str(row["priority"]),
|
|
]
|
|
for row in locked
|
|
]
|
|
|
|
|
|
def _hot_rows(hot: list[tuple[str, list[int]]], top: int) -> list[list[str]]:
|
|
if not hot:
|
|
return [["none", "0", "none"]]
|
|
return [[path, str(len(numbers)), _format_pr_numbers(numbers)] for path, numbers in hot[:top]]
|
|
|
|
|
|
def _review_rows(scored: list[ScoredPullRequest], top: int) -> list[str]:
|
|
if not scored:
|
|
return ["No PRs to rank."]
|
|
lines = []
|
|
for index, item in enumerate(scored[:top], start=1):
|
|
pr = item.pr
|
|
link = f"[#{pr.number}]({pr.url})" if pr.url else f"#{pr.number}"
|
|
reasons = "; ".join(item.reasons)
|
|
lines.append(f"{index}. {link} score {item.score}: {pr.title or 'untitled'} ({reasons})")
|
|
return lines
|
|
|
|
|
|
def _duplicate_rows(groups: list[list[PullRequest]]) -> list[str]:
|
|
if not groups:
|
|
return ["No possible duplicate groups detected from title/file overlap."]
|
|
lines = []
|
|
for group in groups:
|
|
numbers = _format_pr_numbers(pr.number for pr in group)
|
|
titles = "; ".join(f"#{pr.number} {pr.title or 'untitled'}" for pr in group)
|
|
lines.append(f"- Possible duplicate / needs human review: {numbers} - {titles}")
|
|
return lines
|
|
|
|
|
|
def _table(headers: list[str], rows: list[list[str]]) -> list[str]:
|
|
escaped_headers = [_escape_cell(item) for item in headers]
|
|
lines = ["| " + " | ".join(escaped_headers) + " |"]
|
|
lines.append("| " + " | ".join("---" for _ in headers) + " |")
|
|
for row in rows:
|
|
lines.append("| " + " | ".join(_escape_cell(item) for item in row) + " |")
|
|
return lines
|
|
|
|
|
|
def _escape_cell(value: object) -> str:
|
|
return str(value).replace("|", "\\|").replace("\n", " ")
|
|
|
|
|
|
def _format_pr_numbers(numbers: Iterable[int], limit: int = 12) -> str:
|
|
raw_values = [number for number in numbers if number]
|
|
values = [f"#{number}" for number in raw_values[:limit]]
|
|
if len(raw_values) > limit:
|
|
values.append(f"... (+{len(raw_values) - limit} more)")
|
|
return ", ".join(values) if values else "unknown"
|
|
|
|
|
|
def shorten_text(text: str, max_len: int = 110) -> str:
|
|
if len(text) <= max_len:
|
|
return text
|
|
if max_len <= 1:
|
|
return "..."
|
|
return text[: max_len - 3].rstrip() + "..."
|
|
|
|
|
|
def positive_int(value: str) -> int:
|
|
try:
|
|
parsed = int(value)
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError("must be a positive integer") from exc
|
|
if parsed <= 0:
|
|
raise argparse.ArgumentTypeError("must be a positive integer")
|
|
return parsed
|
|
|
|
|
|
def write_output(report: str, path: str | None) -> None:
|
|
if path:
|
|
Path(path).write_text(ANSI_RE.sub("", report), encoding="utf-8")
|
|
return
|
|
sys.stdout.write(report)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Read-only audit of open PR file overlap and blocker risk.")
|
|
source = parser.add_mutually_exclusive_group(required=True)
|
|
source.add_argument("--input", help="Path to JSON from gh pr list --json ... or REST-ish PR payloads")
|
|
source.add_argument("--repo", help="GitHub repository in owner/name form; uses read-only gh commands")
|
|
parser.add_argument("--output", help="Write report to this path instead of stdout")
|
|
parser.add_argument("--limit", type=positive_int, default=1000, help="Live mode: max open PRs to fetch/analyze")
|
|
parser.add_argument("--top", type=positive_int, default=15, help="Rows to show in ranked sections")
|
|
parser.add_argument("--color", choices=["auto", "always", "never"], default="auto", help="Terminal color mode")
|
|
parser.add_argument("--no-color", action="store_const", const="never", dest="color", help="Alias for --color never")
|
|
parser.add_argument("--format", choices=["markdown", "terminal", "json"], default="markdown", help="Output format")
|
|
parser.add_argument("--no-fetch-files", action="store_true", help="Skip per-PR changed-file API calls in live mode")
|
|
parser.add_argument("--progress", choices=["auto", "always", "never"], default="auto", help="Live file-fetch progress mode")
|
|
parser.add_argument("--quiet", action="store_true", help="Suppress progress and non-fatal warning output")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
try:
|
|
if args.input:
|
|
payload = load_json_file(Path(args.input))
|
|
else:
|
|
progress = ProgressReporter(should_show_progress(args))
|
|
payload = fetch_live_prs(args.repo, fetch_files=not args.no_fetch_files, progress=progress, limit=args.limit)
|
|
prs = normalize_prs(payload)
|
|
missing_files = missing_file_metadata_count(prs)
|
|
if args.repo and not args.no_fetch_files and not args.quiet and missing_files:
|
|
sys.stderr.write(f"{missing_metadata_warning(missing_files)}\n")
|
|
if args.format == "terminal":
|
|
report = render_terminal(prs, top=args.top, use_color=should_use_color(args))
|
|
elif args.format == "json":
|
|
report = render_json(prs, top=args.top)
|
|
else:
|
|
report = render_markdown(prs, top=args.top)
|
|
write_output(report, args.output)
|
|
except (RuntimeError, ValueError) as exc:
|
|
sys.stderr.write(f"error: {exc}\n")
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|