tools: add read-only PR blocker audit helper

Adds a standalone read-only PR blocker audit helper with Markdown, terminal, and JSON output plus focused tests and documentation.
This commit is contained in:
Alexandre Teixeira
2026-06-04 12:51:48 +01:00
committed by GitHub
parent f6a5f6592f
commit f2b11ba94e
3 changed files with 2203 additions and 0 deletions

View File

@@ -0,0 +1,964 @@
import importlib.util
import json
import pytest
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "pr_blocker_audit.py"
def load_module():
spec = importlib.util.spec_from_file_location("pr_blocker_audit", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_parses_graphql_style_pr_json():
audit = load_module()
prs = audit.normalize_prs(
[
{
"number": 7,
"title": "Fix auth token rotation",
"author": {"login": "alice"},
"url": "https://example.test/pr/7",
"mergeStateStatus": "CLEAN",
"reviewDecision": "REVIEW_REQUIRED",
"updatedAt": "2026-05-30T12:00:00Z",
"files": [{"path": "core/auth/tokens.py"}],
}
]
)
assert prs[0].number == 7
assert prs[0].author == "alice"
assert prs[0].url.endswith("/7")
assert prs[0].files == ("core/auth/tokens.py",)
assert "Auth / users / API tokens" in prs[0].areas
def test_parses_rest_style_pr_json():
audit = load_module()
prs = audit.normalize_prs(
[
{
"number": 8,
"title": "Improve uploads",
"user": {"login": "bob"},
"html_url": "https://example.test/pr/8",
"mergeable_state": "dirty",
"files": [{"filename": "app/documents/upload.py"}],
}
]
)
assert prs[0].author == "bob"
assert prs[0].url.endswith("/8")
assert prs[0].merge_state == "dirty"
assert prs[0].files == ("app/documents/upload.py",)
def test_parses_file_lists_as_dicts_and_strings():
audit = load_module()
prs = audit.normalize_prs(
[
{
"number": 1,
"title": "Memory update",
"files": ["core/memory.py", {"path": "tests/test_memory.py"}, {"filename": "docs/memory.md"}],
}
]
)
assert prs[0].files == ("core/memory.py", "docs/memory.md", "tests/test_memory.py")
def test_missing_files_is_handled():
audit = load_module()
prs = audit.normalize_prs([{"number": 2, "title": "No file metadata"}])
assert prs[0].files == ()
assert prs[0].author == "unknown"
def test_fetch_live_prs_fills_missing_files(monkeypatch):
audit = load_module()
calls = []
def fake_run(cmd):
calls.append(cmd)
if cmd[:3] == ["gh", "pr", "list"]:
return [
{"number": 1, "title": "Has files", "files": [{"path": "core/auth.py"}]},
{"number": 2, "title": "Needs files", "files": []},
]
return [{"filename": "core/search.py"}, {"filename": "tests/test_search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
payload = audit.fetch_live_prs("owner/repo")
prs = audit.normalize_prs(payload)
assert [pr.files for pr in prs] == [("core/auth.py",), ("core/search.py", "tests/test_search.py")]
assert calls[-1] == ["gh", "api", "--paginate", "repos/owner/repo/pulls/2/files?per_page=100"]
def test_fetch_live_prs_keeps_missing_files_when_per_pr_fetch_fails(monkeypatch):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 3, "title": "Needs files", "files": []}]
raise RuntimeError("rate limit")
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
payload = audit.fetch_live_prs("owner/repo")
prs = audit.normalize_prs(payload)
assert prs[0].files == ()
assert "PR #3: could not fetch changed files: rate limit" in payload["warnings"]
def test_fetch_live_prs_no_fetch_files_skips_per_pr_calls(monkeypatch):
audit = load_module()
calls = []
def fake_run(cmd):
calls.append(cmd)
return [{"number": 4, "title": "Metadata only", "files": []}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
payload = audit.fetch_live_prs("owner/repo", fetch_files=False)
assert payload == [{"number": 4, "title": "Metadata only", "files": []}]
assert len(calls) == 1
def test_fetch_live_prs_passes_limit_to_gh_pr_list(monkeypatch):
audit = load_module()
calls = []
def fake_run(cmd):
calls.append(cmd)
return []
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
audit.fetch_live_prs("owner/repo", fetch_files=True, limit=50)
assert calls[0] == [
"gh",
"pr",
"list",
"--repo",
"owner/repo",
"--state",
"open",
"--limit",
"50",
"--json",
"number,title,author,files,mergeStateStatus,reviewDecision,updatedAt,url",
]
def test_no_fetch_files_omits_files_from_gh_pr_list(monkeypatch):
audit = load_module()
calls = []
def fake_run(cmd):
calls.append(cmd)
return []
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
audit.fetch_live_prs("owner/repo", fetch_files=False, limit=50)
assert calls[0][-1] == "number,title,author,mergeStateStatus,reviewDecision,updatedAt,url"
def test_fetch_live_prs_caps_rest_fallback_by_limit(monkeypatch):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
raise RuntimeError("graphql unavailable")
return [
{"number": 1, "title": "A", "files": []},
{"number": 2, "title": "B", "files": []},
{"number": 3, "title": "C", "files": []},
]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
payload = audit.fetch_live_prs("owner/repo", fetch_files=False, limit=2)
assert [item["number"] for item in payload] == [1, 2]
def test_offline_input_ignores_limit(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(
json.dumps(
[
{"number": 1, "title": "A", "files": []},
{"number": 2, "title": "B", "files": []},
]
),
encoding="utf-8",
)
exit_code = audit.main(["--input", str(path), "--limit", "1"])
output = capsys.readouterr().out
assert exit_code == 0
assert "Total PRs analyzed: 2" in output
def test_invalid_limit_exits_cleanly(capsys):
audit = load_module()
with pytest.raises(SystemExit) as exc:
audit.main(["--repo", "owner/repo", "--limit", "0"])
assert exc.value.code == 2
assert "must be a positive integer" in capsys.readouterr().err
def test_help_includes_limit():
audit = load_module()
help_text = audit.build_parser().format_help()
assert "--limit LIMIT" in help_text
assert "Live mode: max open PRs to fetch/analyze" in help_text
def test_progress_goes_to_stderr_not_stdout(monkeypatch, capsys):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 5, "title": "Needs files", "files": []}]
return [{"filename": "core/search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(["--repo", "owner/repo", "--format", "terminal", "--progress", "always"])
captured = capsys.readouterr()
assert exit_code == 0
assert "PR Blocker Audit" in captured.out
assert "Fetching open PR list..." not in captured.out
assert "Fetching open PR list..." in captured.err
assert "Fetching changed files:" in captured.err
def test_progress_not_shown_for_offline_input(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps([{"number": 6, "title": "Offline", "files": []}]), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--progress", "always"])
captured = capsys.readouterr()
assert exit_code == 0
assert "PR Blocker Audit" in captured.out
assert "Fetching open PR list..." not in captured.err
def test_progress_auto_hidden_when_stderr_is_not_tty(monkeypatch, capsys):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 7, "title": "Needs files", "files": []}]
return [{"filename": "core/search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
monkeypatch.setattr(audit.sys.stderr, "isatty", lambda: False)
exit_code = audit.main(["--repo", "owner/repo", "--progress", "auto"])
captured = capsys.readouterr()
assert exit_code == 0
assert "Fetching open PR list..." not in captured.err
def test_progress_always_shown_when_stderr_is_not_tty(monkeypatch, capsys):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 8, "title": "Needs files", "files": []}]
return [{"filename": "core/search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
monkeypatch.setattr(audit.sys.stderr, "isatty", lambda: False)
exit_code = audit.main(["--repo", "owner/repo", "--progress", "always"])
captured = capsys.readouterr()
assert exit_code == 0
assert "Fetching open PR list..." in captured.err
def test_quiet_suppresses_progress_and_warning(monkeypatch, capsys):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 9, "title": "Needs files", "files": []}]
raise RuntimeError("rate limit")
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(["--repo", "owner/repo", "--progress", "always", "--quiet"])
captured = capsys.readouterr()
assert exit_code == 0
assert "PRs missing changed-file metadata: 1" in captured.out
assert captured.err == ""
def test_report_output_remains_clean_with_progress(monkeypatch, capsys):
audit = load_module()
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 10, "title": "Needs files", "files": []}]
return [{"filename": "core/search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(["--repo", "owner/repo", "--format", "terminal", "--progress", "always"])
captured = capsys.readouterr()
assert exit_code == 0
assert "Fetching changed files:" not in captured.out
assert "Fetched changed files" not in captured.out
assert "core/search.py" in captured.out
def test_markdown_output_file_has_no_progress_or_ansi(monkeypatch, tmp_path, capsys):
audit = load_module()
output_path = tmp_path / "report.md"
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return [{"number": 11, "title": "Needs files", "files": []}]
return [{"filename": "core/search.py"}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(["--repo", "owner/repo", "--output", str(output_path), "--progress", "always"])
captured = capsys.readouterr()
report = output_path.read_text(encoding="utf-8")
assert exit_code == 0
assert captured.out == ""
assert "Fetching changed files:" in captured.err
assert "Fetching changed files:" not in report
assert not re.search(r"\x1b\[[0-9;]*m", report)
def test_no_fetch_files_skips_progress(monkeypatch, capsys):
audit = load_module()
calls = []
def fake_run(cmd):
calls.append(cmd)
return [{"number": 12, "title": "Metadata only", "files": []}]
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(["--repo", "owner/repo", "--no-fetch-files", "--progress", "always"])
captured = capsys.readouterr()
assert exit_code == 0
assert len(calls) == 1
assert "Fetching changed files" not in captured.err
def test_area_classification():
audit = load_module()
areas = audit.classify_areas(["scripts/odysseus-mail", "tests/test_email.py"], "CalDAV sync")
assert "Email / CalDAV" in areas
assert "Docs / tooling / tests" in areas
def test_runtime_plus_test_file_is_not_docs_tooling():
audit = load_module()
areas = audit.classify_areas(["routes/memory_routes.py", "tests/test_memory_routes.py"], "Fix memory route")
assert "Memory / RAG / vector store" in areas
assert "Docs / tooling / tests" not in areas
def test_docs_only_pr_is_docs_tooling():
audit = load_module()
areas = audit.classify_areas(["docs/pr-blocker-audit.md"], "Update docs")
assert "Docs / tooling / tests" in areas
def test_script_tooling_only_pr_is_docs_tooling():
audit = load_module()
areas = audit.classify_areas(["scripts/pr_blocker_audit.py"], "Tooling script update")
assert "Docs / tooling / tests" in areas
def test_readme_only_pr_is_docs_tooling():
audit = load_module()
areas = audit.classify_areas(["README.md"], "README update")
assert "Docs / tooling / tests" in areas
def test_memory_owner_scope_leak_is_not_classified_as_auth():
audit = load_module()
areas = audit.classify_areas(
["routes/memory_routes.py", "services/memory/store.py"],
"fix: memory route leaks another user's session",
)
assert "Memory / RAG / vector store" in areas
assert "Auth / users / API tokens" not in areas
def test_bearer_token_auth_path_is_classified_as_auth():
audit = load_module()
areas = audit.classify_areas(
["core/auth.py", "routes/auth_routes.py"],
"fix: deleted users keep API access through bearer tokens",
)
assert "Auth / users / API tokens" in areas
def test_generic_security_file_is_not_classified_as_auth():
audit = load_module()
areas = audit.classify_areas(
["tests/test_email_linkify_security_js.py"],
"Harden email HTML URL sanitization",
)
assert "Email / CalDAV" in areas
assert "Auth / users / API tokens" not in areas
def test_hot_file_overlap_detection():
audit = load_module()
prs = audit.normalize_prs(
[
{"number": 1, "title": "A", "files": ["core/search.py"]},
{"number": 2, "title": "B", "files": ["core/search.py", "tests/test_search.py"]},
{"number": 3, "title": "C", "files": ["core/other.py"]},
]
)
assert audit.hot_files(prs) == [("core/search.py", [1, 2])]
def test_possible_duplicate_grouping():
audit = load_module()
prs = audit.normalize_prs(
[
{"number": 1, "title": "Fix auth token refresh", "files": ["core/auth.py", "tests/test_auth.py"]},
{"number": 2, "title": "Repair auth token refresh", "files": ["core/auth.py", "tests/test_auth.py"]},
{"number": 3, "title": "Improve gallery preview", "files": ["core/gallery.py"]},
]
)
groups = audit.duplicate_candidates(prs)
assert [[pr.number for pr in group] for group in groups] == [[1, 2]]
def test_score_ranking_is_deterministic():
audit = load_module()
prs = audit.normalize_prs(
[
{
"number": 2,
"title": "Gallery polish",
"reviewDecision": "APPROVED",
"updatedAt": "2026-05-20T00:00:00Z",
"files": ["core/gallery.py"],
},
{
"number": 1,
"title": "Fix auth token owner permission",
"mergeStateStatus": "DIRTY",
"reviewDecision": "REVIEW_REQUIRED",
"updatedAt": "2026-06-01T00:00:00Z",
"files": ["core/auth.py", "tests/test_auth.py"],
},
]
)
scored = audit.score_prs(prs, now=datetime(2026, 6, 3, tzinfo=timezone.utc))
assert [item.pr.number for item in scored] == [1, 2]
assert scored[0].score > scored[1].score
def test_direct_bearer_token_issue_ranks_above_dirty_memory_leak():
audit = load_module()
prs = audit.normalize_prs(
[
{
"number": 1,
"title": "fix: deleted users keep API access through bearer tokens",
"mergeStateStatus": "CLEAN",
"files": ["core/auth.py", "routes/auth_routes.py"],
},
{
"number": 2,
"title": "fix: memory route leaks another user's session",
"mergeStateStatus": "DIRTY",
"files": ["routes/memory_routes.py", "services/memory/store.py"],
},
]
)
scored = audit.score_prs(prs, now=datetime(2026, 6, 3, tzinfo=timezone.utc))
assert [item.pr.number for item in scored] == [1, 2]
assert scored[0].score > scored[1].score
def test_dirty_state_is_caution_text_not_priority_boost():
audit = load_module()
dirty_memory = audit.normalize_prs(
[
{
"number": 2,
"title": "fix: memory route leaks another user's session",
"mergeStateStatus": "DIRTY",
"files": ["routes/memory_routes.py", "services/memory/store.py"],
}
]
)[0]
clean_auth = audit.normalize_prs(
[
{
"number": 1,
"title": "fix: deleted users keep API access through bearer tokens",
"mergeStateStatus": "CLEAN",
"files": ["core/auth.py", "routes/auth_routes.py"],
}
]
)[0]
dirty_score = audit.score_pr(dirty_memory, audit.Counter(), datetime(2026, 6, 3, tzinfo=timezone.utc))
clean_auth_score = audit.score_pr(clean_auth, audit.Counter(), datetime(2026, 6, 3, tzinfo=timezone.utc))
assert dirty_score.score < clean_auth_score.score
assert any("caution: merge state DIRTY" == reason for reason in dirty_score.reasons)
def test_markdown_contains_expected_sections_and_no_ansi():
audit = load_module()
prs = audit.normalize_prs([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}])
report = audit.render_markdown(prs)
assert "# PR Blocker Audit" in report
assert "## Executive summary" in report
assert "## Locked code areas" in report
assert "## Hot files" in report
assert "## Review / blocker priorities" in report
assert "## Duplicate candidates" in report
assert "## Safer areas for new work" in report
assert not re.search(r"\x1b\[[0-9;]*m", report)
def test_report_includes_missing_file_metadata_count():
audit = load_module()
prs = audit.normalize_prs(
[
{"number": 1, "title": "Fix search", "files": ["core/search.py"]},
{"number": 2, "title": "No files"},
]
)
markdown = audit.render_markdown(prs)
terminal = audit.render_terminal(prs, use_color=False)
assert "- PRs missing changed-file metadata: 1" in markdown
assert "PRs missing changed-file metadata: 1" in terminal
def test_overlap_summary_uses_hot_files_not_huge_clusters():
audit = load_module()
prs = audit.normalize_prs(
[{"number": number, "title": f"PR {number}", "files": ["common.py"]} for number in range(1, 25)]
)
report = audit.render_terminal(prs, use_color=False)
assert "Main overlap drivers: common.py (24 PRs)" in report
assert "Largest overlap clusters" not in report
assert "24 PRs (#1, #2" not in report
def test_long_pr_number_lists_are_truncated():
audit = load_module()
assert audit._format_pr_numbers(range(1, 16), limit=4) == "#1, #2, #3, #4, ... (+11 more)"
def test_other_locked_area_sorts_after_classified_critical_area():
audit = load_module()
payload = [
{"number": 1, "title": "Fix auth token", "files": ["core/auth.py"]},
{"number": 2, "title": "Fix auth login", "files": ["routes/auth.py"]},
{"number": 3, "title": "Fix auth permission", "files": ["tests/test_auth.py"]},
{"number": 4, "title": "Fix auth security", "files": ["docs/auth.md"]},
]
payload.extend({"number": number, "title": f"Unclassified {number}"} for number in range(5, 25))
prs = audit.normalize_prs(payload)
locked = audit.locked_areas(prs, audit.score_prs(prs))
assert locked[0]["area"] == "Auth / users / API tokens"
assert locked[-1]["area"] == "Other / unclassified"
assert locked[-1]["why"] == "20 PRs, mostly missing changed-file metadata"
def test_terminal_render_color_modes():
audit = load_module()
prs = audit.normalize_prs(
[
{"number": 1, "title": "Fix search", "mergeStateStatus": "CLEAN", "files": ["core/search.py"]},
{"number": 2, "title": "Search follow-up", "mergeStateStatus": "DIRTY", "files": ["core/search.py"]},
]
)
colored = audit.render_terminal(prs, use_color=True)
plain = audit.render_terminal(prs, use_color=False)
assert "Hot files" in plain
assert "core/search.py" in plain
assert "Review / blocker priorities" in plain
assert "Heuristic score only; inspect these first, do not merge without validation." in plain
assert re.search(r"\x1b\[[0-9;]*m", colored)
assert not re.search(r"\x1b\[[0-9;]*m", plain)
def test_terminal_hot_files_respects_top():
audit = load_module()
prs = audit.normalize_prs(
[
{"number": 1, "title": "A", "files": ["a.py", "b.py"]},
{"number": 2, "title": "B", "files": ["a.py", "b.py"]},
{"number": 3, "title": "C", "files": ["b.py"]},
]
)
report = audit.render_terminal(prs, top=1, use_color=False)
assert "Hot files" in report
assert "- b.py" in report
assert "- a.py" not in report
def test_terminal_truncates_long_title_but_markdown_keeps_it():
audit = load_module()
long_title = "Fix search " + "very-long-detail " * 12
prs = audit.normalize_prs([{"number": 1, "title": long_title, "files": ["core/search.py"]}])
terminal = audit.render_terminal(prs, use_color=False)
markdown = audit.render_markdown(prs)
short_title = audit.shorten_text(long_title)
assert short_title in terminal
assert long_title not in terminal
assert long_title in markdown
def test_cli_terminal_color_always_outputs_ansi(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}]), encoding="utf-8")
exit_code = audit.main(["--format", "terminal", "--color", "always", "--input", str(path)])
output = capsys.readouterr().out
assert exit_code == 0
assert re.search(r"\x1b\[[0-9;]*m", output)
def test_cli_terminal_no_color_outputs_no_ansi(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps([{"number": 1, "title": "Fix search", "files": ["core/search.py"]}]), encoding="utf-8")
exit_code = audit.main(["--format", "terminal", "--no-color", "--input", str(path)])
output = capsys.readouterr().out
assert exit_code == 0
assert not re.search(r"\x1b\[[0-9;]*m", output)
def test_color_auto_requires_terminal_and_support(monkeypatch):
audit = load_module()
args = audit.argparse.Namespace(format="terminal", color="auto", output=None)
monkeypatch.setattr(audit.sys.stdout, "isatty", lambda: True)
monkeypatch.delenv("NO_COLOR", raising=False)
monkeypatch.setitem(audit.os.environ, "TERM", "xterm-256color")
assert audit.should_use_color(args)
monkeypatch.setitem(audit.os.environ, "NO_COLOR", "1")
assert not audit.should_use_color(args)
def test_color_output_file_and_markdown_disable_ansi(monkeypatch):
audit = load_module()
monkeypatch.setattr(audit.sys.stdout, "isatty", lambda: True)
monkeypatch.setitem(audit.os.environ, "TERM", "xterm-256color")
output_args = audit.argparse.Namespace(format="terminal", color="auto", output="report.txt")
markdown_args = audit.argparse.Namespace(format="markdown", color="always", output=None)
assert not audit.should_use_color(output_args)
assert not audit.should_use_color(markdown_args)
def test_invalid_json_handled_cleanly(tmp_path):
audit = load_module()
path = tmp_path / "bad.json"
path.write_text("{bad json", encoding="utf-8")
exit_code = audit.main(["--input", str(path)])
assert exit_code == 1
def test_empty_input_handled_cleanly(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps([]), encoding="utf-8")
exit_code = audit.main(["--input", str(path)])
output = capsys.readouterr().out
assert exit_code == 0
assert "Total PRs analyzed: 0" in output
assert "No PRs to rank." in output
# --- JSON format tests ---
JSON_PRS = [
{
"number": 1,
"title": "Fix auth token rotation",
"author": {"login": "alice"},
"url": "https://example.test/pr/1",
"mergeStateStatus": "CLEAN",
"reviewDecision": "REVIEW_REQUIRED",
"updatedAt": "2026-05-30T12:00:00Z",
"files": [{"path": "core/auth.py"}, {"path": "tests/test_auth.py"}],
},
{
"number": 2,
"title": "Fix auth login flow",
"author": {"login": "bob"},
"url": "https://example.test/pr/2",
"mergeStateStatus": "DIRTY",
"reviewDecision": "CHANGES_REQUESTED",
"updatedAt": "2026-05-28T10:00:00Z",
"files": [{"path": "core/auth.py"}, {"path": "routes/auth_routes.py"}],
},
]
def test_json_output_parses_with_json_loads(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
parsed = json.loads(output)
assert isinstance(parsed, dict)
def test_json_output_includes_expected_top_level_keys(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
parsed = json.loads(output)
assert set(parsed.keys()) == {
"summary",
"locked_areas",
"hot_files",
"review_priorities",
"duplicate_candidates",
"safer_areas",
}
def test_json_summary_fields(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
summary = json.loads(output)["summary"]
assert summary["total_prs_analyzed"] == 2
assert "unique_files_touched" in summary
assert "prs_missing_changed_file_metadata" in summary
assert "main_overlap_drivers" in summary
assert "highest_risk_areas" in summary
assert "recommended_first_review_target" in summary
def test_json_review_priorities_structure(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
priorities = json.loads(output)["review_priorities"]
assert len(priorities) >= 1
first = priorities[0]
assert set(first.keys()) >= {"rank", "number", "score", "title", "url", "merge_state", "review_decision", "reasons"}
assert first["rank"] == 1
assert isinstance(first["reasons"], list)
def test_json_hot_files_structure(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
hot = json.loads(output)["hot_files"]
assert len(hot) >= 1
assert hot[0]["file"] == "core/auth.py"
assert hot[0]["pr_count"] == 2
assert set(hot[0]["pr_numbers"]) == {1, 2}
def test_json_output_file_excludes_progress_and_ansi_in_live_output_file(monkeypatch, tmp_path, capsys):
audit = load_module()
output_path = tmp_path / "report.json"
def fake_run(cmd):
if cmd[:3] == ["gh", "pr", "list"]:
return JSON_PRS
return []
monkeypatch.setattr(audit, "_run_gh_json", fake_run)
exit_code = audit.main(
["--repo", "owner/repo", "--format", "json", "--output", str(output_path), "--progress", "always"]
)
captured = capsys.readouterr()
report = output_path.read_text(encoding="utf-8")
assert exit_code == 0
assert captured.out == ""
assert "Fetching open PR list..." in captured.err or "Fetching changed files" in captured.err
parsed = json.loads(report)
assert set(parsed.keys()) == {
"summary",
"locked_areas",
"hot_files",
"review_priorities",
"duplicate_candidates",
"safer_areas",
}
assert not re.search(r"\x1b\[[0-9;]*m", report)
assert "Fetching" not in report
def test_json_format_with_color_always_emits_no_ansi(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json", "--color", "always"])
output = capsys.readouterr().out
assert exit_code == 0
assert not re.search(r"\x1b\[[0-9;]*m", output)
parsed = json.loads(output)
assert isinstance(parsed, dict)
def test_json_output_is_deterministic(tmp_path):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps(JSON_PRS), encoding="utf-8")
prs = audit.normalize_prs(JSON_PRS)
first = audit.render_json(prs)
second = audit.render_json(prs)
assert first == second
parsed = json.loads(first)
assert isinstance(parsed, dict)
def test_json_empty_input_handled_cleanly(tmp_path, capsys):
audit = load_module()
path = tmp_path / "prs.json"
path.write_text(json.dumps([]), encoding="utf-8")
exit_code = audit.main(["--input", str(path), "--format", "json"])
output = capsys.readouterr().out
assert exit_code == 0
parsed = json.loads(output)
assert parsed["summary"]["total_prs_analyzed"] == 0
assert parsed["hot_files"] == []
assert parsed["review_priorities"] == []
def test_help_includes_json_format_choice():
audit = load_module()
help_text = audit.build_parser().format_help()
assert "markdown" in help_text
assert "terminal" in help_text
assert "json" in help_text