From 6fba33d4437bbc1e85db758fa2e321c7621f9895 Mon Sep 17 00:00:00 2001 From: Xander Date: Thu, 26 Feb 2026 19:42:25 +0000 Subject: [PATCH] Fix payouts notifications not delivering (#5430) * fix FK violation when inserting rows into `notifications_deliveries` * add test for FK violation when inserting into notifications_deliveries * sqlx prepare * add migration to prevent stale notifications from being dequeued all at once upon fix * Revert "add migration to prevent stale notifications from being dequeued all at once upon fix" This reverts commit 446f398752bbddb632196a549501f9ce0b2da67f. --- ...ed0c61cd80c0d9eacd3053fadd09f7c67e6d.json} | 22 ++- .../src/database/models/notification_item.rs | 32 +++- apps/labrinth/src/queue/payouts/mod.rs | 176 ++++++++++++++++++ 3 files changed, 220 insertions(+), 10 deletions(-) rename apps/labrinth/.sqlx/{query-807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd.json => query-ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d.json} (73%) diff --git a/apps/labrinth/.sqlx/query-807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd.json b/apps/labrinth/.sqlx/query-ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d.json similarity index 73% rename from apps/labrinth/.sqlx/query-807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd.json rename to apps/labrinth/.sqlx/query-ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d.json index 9380c69df..ad2796d0f 100644 --- a/apps/labrinth/.sqlx/query-807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd.json +++ b/apps/labrinth/.sqlx/query-ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d.json @@ -1,8 +1,19 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n WHERE sum >= 100\n ", + "query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n WHERE sum >= 100\n RETURNING id, user_id\n ", "describe": { - "columns": [], + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "user_id", + "type_info": "Int8" + } + ], "parameters": { "Left": [ "Int8Array", @@ -10,7 +21,10 @@ "TimestamptzArray" ] }, - "nullable": [] + "nullable": [ + false, + false + ] }, - "hash": "807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd" + "hash": "ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d" } diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index ae0107633..c25858ef2 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -57,7 +57,13 @@ impl NotificationBuilder { let notification_ids = notification_ids.iter().map(|x| x.0).collect::>(); - sqlx::query!( + // Use RETURNING to get back only the notification_ids and user_ids + // that were actually inserted (i.e. those with sum >= 100). + // This is necessary because insert_many_deliveries references + // notifications(id) via a foreign key, passing notification_ids + // that were filtered out by the WHERE clause would cause a + // FK constraint violation and fail the entire transaction. + let inserted_rows = sqlx::query!( " WITH period_payouts AS ( @@ -83,15 +89,29 @@ impl NotificationBuilder { ) body FROM period_payouts WHERE sum >= 100 + RETURNING id, user_id ", ¬ification_ids[..], &users_raw_ids[..], &dates_available[..], ) - .execute(&mut *transaction) + .fetch_all(&mut *transaction) .await?; - let notification_types = notification_ids + if inserted_rows.is_empty() { + return Ok(()); + } + + let inserted_notification_ids: Vec = + inserted_rows.iter().map(|r| r.id).collect(); + let inserted_user_raw_ids: Vec = + inserted_rows.iter().map(|r| r.user_id).collect(); + let inserted_users: Vec = inserted_user_raw_ids + .iter() + .map(|&id| DBUserId(id)) + .collect(); + + let notification_types = inserted_notification_ids .iter() .map(|_| NotificationType::PayoutAvailable.as_str()) .collect::>(); @@ -99,10 +119,10 @@ impl NotificationBuilder { NotificationBuilder::insert_many_deliveries( transaction, redis, - ¬ification_ids, - &users_raw_ids, + &inserted_notification_ids, + &inserted_user_raw_ids, ¬ification_types, - &users, + &inserted_users, ) .await?; diff --git a/apps/labrinth/src/queue/payouts/mod.rs b/apps/labrinth/src/queue/payouts/mod.rs index ffd2ee94a..2cb75c25d 100644 --- a/apps/labrinth/src/queue/payouts/mod.rs +++ b/apps/labrinth/src/queue/payouts/mod.rs @@ -1393,3 +1393,179 @@ async fn check_balance_with_webhook( Ok(result.ok().flatten()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::{ + api_v3::ApiV3, + database::USER_USER_ID_PARSED, + environment::{TestEnvironment, with_test_environment}, + }; + use rust_decimal::dec; + + async fn setup_payouts_values( + db: &PgPool, + entries: Vec<(i64, Decimal, DateTime)>, // (user_id, amount, date_available) + ) { + for (user_id, amount, date_available) in &entries { + sqlx::query!( + "INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available) + VALUES ($1, NULL, $2, NOW(), $3)", + user_id, + amount, + date_available, + ) + .execute(db) + .await + .unwrap(); + } + + for (user_id, _amount, date_available) in &entries { + sqlx::query!( + "INSERT INTO payouts_values_notifications (date_available, user_id, notified) + VALUES ($1, $2, FALSE) + ON CONFLICT (date_available, user_id) DO NOTHING", + date_available, + user_id, + ) + .execute(db) + .await + .unwrap(); + } + } + + /// When a user's payout amount is below + /// the $1.00 (100 cents) threshold, the `WHERE sum >= 100` filter in + /// insert_many_payout_notifications skips the INSERT into notifications + /// for that user, but the old code still passed ALL pre-generated + /// notification_ids to insert_many_deliveries. Since + /// notifications_deliveries.notification_id has a FK constraint on + /// notifications(id), this caused a violation that failed + /// the entire transaction. The notified flag was never set to `true`, + /// so the rows accumulated and the same failure repeated every run. + #[actix_rt::test] + async fn test_payout_notification_below_threshold_does_not_fail() { + with_test_environment(None, |env: TestEnvironment| async move { + let db = &env.db.pool; + let redis = &env.db.redis_pool; + + let user_id = USER_USER_ID_PARSED; + + // date_available must be in the past so the notification query + // picks it up (date_available <= NOW()). + let date_available = Utc::now() - Duration::hours(1); + + // Amount of $0.50 -- below the $1.00 threshold (sum < 100 cents). + setup_payouts_values( + db, + vec![(user_id, dec!(0.50), date_available)], + ) + .await; + + // This should succeed, NOT return an error. + // Before the fix, this would fail with a FK constraint violation. + let result = index_payouts_notifications(db, redis).await; + assert!( + result.is_ok(), + "index_payouts_notifications should succeed for below-threshold payouts, got: {:?}", + result.err() + ); + + // Verify the notification row was marked as notified (not stuck). + let remaining = sqlx::query_scalar!( + "SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE AND user_id = $1", + user_id, + ) + .fetch_one(db) + .await + .unwrap() + .unwrap_or(0); + + assert_eq!( + remaining, 0, + "All payouts_values_notifications rows should be marked notified" + ); + }) + .await; + } + + /// When there is a mix of users, some above and some below the + /// threshold, the above-threshold users should get notifications + /// while the below-threshold ones are silently skipped. The entire + /// transaction must succeed (not fail due to the below-threshold user). + #[actix_rt::test] + async fn test_payout_notification_mixed_threshold_users() { + with_test_environment(None, |env: TestEnvironment| async move { + let db = &env.db.pool; + let redis = &env.db.redis_pool; + + let above_user_id = USER_USER_ID_PARSED; // user 3 + let below_user_id = 4i64; // FRIEND_USER_ID + let date_available = Utc::now() - Duration::hours(1); + + setup_payouts_values( + db, + vec![ + // Above threshold + (above_user_id, dec!(5.00), date_available), + // Below threshold + (below_user_id, dec!(0.25), date_available), + ], + ) + .await; + + let result = index_payouts_notifications(db, redis).await; + assert!( + result.is_ok(), + "index_payouts_notifications should succeed with mixed users, got: {:?}", + result.err() + ); + + // Above-threshold user should have a notification. + let above_count = sqlx::query_scalar!( + "SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND body->>'type' = 'payout_available'", + above_user_id, + ) + .fetch_one(db) + .await + .unwrap() + .unwrap_or(0); + + assert!( + above_count > 0, + "Above-threshold user should have a payout notification" + ); + + // Below-threshold user should NOT have a notification. + let below_count = sqlx::query_scalar!( + "SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND body->>'type' = 'payout_available'", + below_user_id, + ) + .fetch_one(db) + .await + .unwrap() + .unwrap_or(0); + + assert_eq!( + below_count, 0, + "Below-threshold user should NOT have a payout notification" + ); + + // Both should be marked notified. + let remaining = sqlx::query_scalar!( + "SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE", + ) + .fetch_one(db) + .await + .unwrap() + .unwrap_or(0); + + assert_eq!( + remaining, 0, + "All payouts_values_notifications rows should be marked notified" + ); + }) + .await; + } +}