Rescan tech review reports when a new version of Delphi is ran (#5433)
* Delphi rescan when version changes * Fix inserting duplicate reports when rescanning * upsert report issue details instead of deleting * fix up rescan stuff
This commit is contained in:
@@ -182,6 +182,27 @@ pub struct DelphiReportIssueResult {
|
||||
}
|
||||
|
||||
impl DBDelphiReportIssue {
|
||||
pub async fn upsert(
|
||||
&self,
|
||||
transaction: &mut PgTransaction<'_>,
|
||||
) -> Result<DelphiReportIssueId, DatabaseError> {
|
||||
Ok(DelphiReportIssueId(
|
||||
sqlx::query_scalar!(
|
||||
"
|
||||
INSERT INTO delphi_report_issues (report_id, issue_type)
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (report_id, issue_type) DO UPDATE SET
|
||||
issue_type = EXCLUDED.issue_type
|
||||
RETURNING id
|
||||
",
|
||||
self.report_id as DelphiReportId,
|
||||
self.issue_type,
|
||||
)
|
||||
.fetch_one(&mut *transaction)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn insert(
|
||||
&self,
|
||||
transaction: &mut PgTransaction<'_>,
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::database::{PgPool, ReadOnlyPgPool};
|
||||
use crate::env::ENV;
|
||||
use crate::queue::billing::{index_billing, index_subscriptions};
|
||||
use crate::queue::moderation::AutomatedModerationQueue;
|
||||
use crate::routes::internal::delphi::rescan::rescan_projects_in_queue;
|
||||
use crate::util::anrok;
|
||||
use crate::util::archon::ArchonClient;
|
||||
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
|
||||
@@ -102,6 +103,15 @@ pub fn app_setup(
|
||||
|
||||
let scheduler = scheduler::Scheduler::new();
|
||||
|
||||
{
|
||||
let pool_ref = pool.clone();
|
||||
actix_rt::spawn(async move {
|
||||
if let Err(err) = rescan_projects_in_queue(&pool_ref).await {
|
||||
warn!("Delphi rescan failed: {err:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let limiter = web::Data::new(AsyncRateLimiter::new(
|
||||
redis_pool.clone(),
|
||||
GCRAParameters::new(300, 300),
|
||||
|
||||
@@ -34,6 +34,8 @@ use crate::{
|
||||
util::{error::Context, guards::admin_key_guard},
|
||||
};
|
||||
|
||||
pub mod rescan;
|
||||
|
||||
pub fn config(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(
|
||||
web::scope("delphi")
|
||||
@@ -197,7 +199,10 @@ async fn ingest_report_deserialized(
|
||||
|
||||
report.send_to_slack(&pool, &redis).await.ok();
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
let mut transaction = pool
|
||||
.begin()
|
||||
.await
|
||||
.wrap_internal_err("failed to begin Delphi ingest transaction")?;
|
||||
|
||||
let report_id = DBDelphiReport {
|
||||
id: DelphiReportId(0), // This will be set by the database
|
||||
@@ -208,7 +213,8 @@ async fn ingest_report_deserialized(
|
||||
severity: report.severity,
|
||||
}
|
||||
.upsert(&mut transaction)
|
||||
.await?;
|
||||
.await
|
||||
.wrap_internal_err("failed to upsert Delphi report")?;
|
||||
|
||||
info!(
|
||||
num_issues = %report.issues.len(),
|
||||
@@ -292,8 +298,9 @@ async fn ingest_report_deserialized(
|
||||
report_id,
|
||||
issue_type: "__dummy".into(),
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
.upsert(&mut transaction)
|
||||
.await
|
||||
.wrap_internal_err("failed to upsert dummy Delphi report issue")?;
|
||||
|
||||
ReportIssueDetail {
|
||||
id: DelphiReportIssueDetailsId(0), // This will be set by the database
|
||||
@@ -307,7 +314,10 @@ async fn ingest_report_deserialized(
|
||||
status: DelphiStatus::Pending,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
.await
|
||||
.wrap_internal_err(
|
||||
"failed to insert dummy Delphi report issue detail",
|
||||
)?;
|
||||
}
|
||||
|
||||
for (issue_type, issue_details) in report.issues {
|
||||
@@ -316,12 +326,14 @@ async fn ingest_report_deserialized(
|
||||
report_id,
|
||||
issue_type,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
.upsert(&mut transaction)
|
||||
.await
|
||||
.wrap_internal_err("failed to upsert Delphi report issue")?;
|
||||
|
||||
// This is required to handle the case where the same Delphi version is re-run on the same file
|
||||
ReportIssueDetail::remove_all_by_issue_id(issue_id, &mut transaction)
|
||||
.await?;
|
||||
.await
|
||||
.wrap_internal_err("failed to remove old Delphi issue details")?;
|
||||
|
||||
for issue_detail in issue_details {
|
||||
let decompiled_source =
|
||||
@@ -339,11 +351,15 @@ async fn ingest_report_deserialized(
|
||||
status: DelphiStatus::Pending,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
.await
|
||||
.wrap_internal_err("failed to insert Delphi issue detail")?;
|
||||
}
|
||||
}
|
||||
|
||||
transaction.commit().await?;
|
||||
transaction
|
||||
.commit()
|
||||
.await
|
||||
.wrap_internal_err("failed to commit Delphi ingest transaction")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
144
apps/labrinth/src/routes/internal/delphi/rescan.rs
Normal file
144
apps/labrinth/src/routes/internal/delphi/rescan.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use eyre::{Result, WrapErr, eyre};
|
||||
use futures::future::try_join_all;
|
||||
use tracing::info;
|
||||
|
||||
use super::{DELPHI_CLIENT, DelphiRunParameters};
|
||||
use crate::{database::PgPool, env::ENV, models::ids::FileId};
|
||||
|
||||
pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> {
|
||||
let delphi_version = fetch_delphi_version().await?;
|
||||
let old_delphi_version = fetch_stored_delphi_version(pool).await?;
|
||||
|
||||
if old_delphi_version == Some(delphi_version) {
|
||||
info!(
|
||||
?delphi_version,
|
||||
"Delphi version unchanged; skipping startup tech review rescan"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
?old_delphi_version,
|
||||
?delphi_version,
|
||||
delphi_version,
|
||||
"Delphi version changed; rescanning tech review queue"
|
||||
);
|
||||
|
||||
let project_ids = fetch_unreviewed_tech_review_project_ids(pool).await?;
|
||||
if project_ids.is_empty() {
|
||||
info!("No fully unreviewed tech review projects found to rescan");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let file_ids = fetch_project_file_ids(pool, &project_ids).await?;
|
||||
if file_ids.is_empty() {
|
||||
info!(
|
||||
project_count = project_ids.len(),
|
||||
"No files found for tech review projects selected for rescan"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let file_ids = file_ids
|
||||
.into_iter()
|
||||
.map(|file_id| FileId(file_id.cast_unsigned()));
|
||||
|
||||
try_join_all(file_ids.map(|file_id| async move {
|
||||
super::run(pool, DelphiRunParameters { file_id })
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
eyre!("failed to submit Delphi rescan for `{file_id:?}`")
|
||||
})
|
||||
}))
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
project_count = project_ids.len(),
|
||||
"Submitted Delphi rescans for all unreviewed tech review project files"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_delphi_version() -> Result<i32> {
|
||||
let response = DELPHI_CLIENT
|
||||
.get(format!("{}/version", ENV.DELPHI_URL))
|
||||
.send()
|
||||
.await
|
||||
.and_then(|res| res.error_for_status())
|
||||
.wrap_err("failed to fetch Delphi version")?;
|
||||
|
||||
let version = response
|
||||
.text()
|
||||
.await
|
||||
.wrap_err("failed to read Delphi version response body")?;
|
||||
let version = version.trim().parse::<i32>().wrap_err_with(|| {
|
||||
eyre!("invalid Delphi version response body: {version}")
|
||||
})?;
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
async fn fetch_stored_delphi_version(pool: &PgPool) -> Result<Option<i32>> {
|
||||
let row =
|
||||
sqlx::query_scalar!("SELECT MAX(delphi_version) FROM delphi_reports")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.wrap_err("failed to fetch latest stored Delphi version")?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
async fn fetch_unreviewed_tech_review_project_ids(
|
||||
pool: &PgPool,
|
||||
) -> Result<Vec<i64>> {
|
||||
sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT DISTINCT m.id
|
||||
FROM mods m
|
||||
WHERE
|
||||
EXISTS(
|
||||
SELECT 1
|
||||
FROM delphi_issue_details_with_statuses didws
|
||||
INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id
|
||||
WHERE
|
||||
didws.project_id = m.id
|
||||
AND didws.status = 'pending'
|
||||
-- see delphi.rs todo comment
|
||||
AND dri.issue_type != '__dummy'
|
||||
)
|
||||
AND NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM delphi_issue_details_with_statuses didws
|
||||
INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id
|
||||
WHERE
|
||||
didws.project_id = m.id
|
||||
AND didws.status IN ('safe', 'unsafe')
|
||||
-- see delphi.rs todo comment
|
||||
AND dri.issue_type != '__dummy'
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.wrap_err("failed to fetch fully unreviewed tech review project ids")
|
||||
}
|
||||
|
||||
async fn fetch_project_file_ids(
|
||||
pool: &PgPool,
|
||||
project_ids: &[i64],
|
||||
) -> Result<Vec<i64>> {
|
||||
let rows = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT DISTINCT dr.file_id
|
||||
FROM delphi_reports dr
|
||||
INNER JOIN files f ON f.id = dr.file_id
|
||||
INNER JOIN versions v ON v.id = f.version_id
|
||||
WHERE v.mod_id = ANY($1::bigint[])
|
||||
"#,
|
||||
project_ids,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.wrap_err("failed to fetch file ids for tech review Delphi rescan")?;
|
||||
|
||||
Ok(rows.into_iter().flatten().collect())
|
||||
}
|
||||
Reference in New Issue
Block a user