Fix payouts notifications not delivering (#5430)
* fix FK violation when inserting rows into `notifications_deliveries` * add test for FK violation when inserting into notifications_deliveries * sqlx prepare * add migration to prevent stale notifications from being dequeued all at once upon fix * Revert "add migration to prevent stale notifications from being dequeued all at once upon fix" This reverts commit 446f398752bbddb632196a549501f9ce0b2da67f.
This commit is contained in:
@@ -1,8 +1,19 @@
|
|||||||
{
|
{
|
||||||
"db_name": "PostgreSQL",
|
"db_name": "PostgreSQL",
|
||||||
"query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n WHERE sum >= 100\n ",
|
"query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n WHERE sum >= 100\n RETURNING id, user_id\n ",
|
||||||
"describe": {
|
"describe": {
|
||||||
"columns": [],
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "id",
|
||||||
|
"type_info": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ordinal": 1,
|
||||||
|
"name": "user_id",
|
||||||
|
"type_info": "Int8"
|
||||||
|
}
|
||||||
|
],
|
||||||
"parameters": {
|
"parameters": {
|
||||||
"Left": [
|
"Left": [
|
||||||
"Int8Array",
|
"Int8Array",
|
||||||
@@ -10,7 +21,10 @@
|
|||||||
"TimestamptzArray"
|
"TimestamptzArray"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"nullable": []
|
"nullable": [
|
||||||
|
false,
|
||||||
|
false
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"hash": "807d42d4aab8312fc6dfc60c93c6cecf947c2f96b43926ce5147c4aafe9089cd"
|
"hash": "ccdee0f8f60e0ac8997eacba6993ed0c61cd80c0d9eacd3053fadd09f7c67e6d"
|
||||||
}
|
}
|
||||||
@@ -57,7 +57,13 @@ impl NotificationBuilder {
|
|||||||
let notification_ids =
|
let notification_ids =
|
||||||
notification_ids.iter().map(|x| x.0).collect::<Vec<_>>();
|
notification_ids.iter().map(|x| x.0).collect::<Vec<_>>();
|
||||||
|
|
||||||
sqlx::query!(
|
// Use RETURNING to get back only the notification_ids and user_ids
|
||||||
|
// that were actually inserted (i.e. those with sum >= 100).
|
||||||
|
// This is necessary because insert_many_deliveries references
|
||||||
|
// notifications(id) via a foreign key, passing notification_ids
|
||||||
|
// that were filtered out by the WHERE clause would cause a
|
||||||
|
// FK constraint violation and fail the entire transaction.
|
||||||
|
let inserted_rows = sqlx::query!(
|
||||||
"
|
"
|
||||||
WITH
|
WITH
|
||||||
period_payouts AS (
|
period_payouts AS (
|
||||||
@@ -83,15 +89,29 @@ impl NotificationBuilder {
|
|||||||
) body
|
) body
|
||||||
FROM period_payouts
|
FROM period_payouts
|
||||||
WHERE sum >= 100
|
WHERE sum >= 100
|
||||||
|
RETURNING id, user_id
|
||||||
",
|
",
|
||||||
¬ification_ids[..],
|
¬ification_ids[..],
|
||||||
&users_raw_ids[..],
|
&users_raw_ids[..],
|
||||||
&dates_available[..],
|
&dates_available[..],
|
||||||
)
|
)
|
||||||
.execute(&mut *transaction)
|
.fetch_all(&mut *transaction)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let notification_types = notification_ids
|
if inserted_rows.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let inserted_notification_ids: Vec<i64> =
|
||||||
|
inserted_rows.iter().map(|r| r.id).collect();
|
||||||
|
let inserted_user_raw_ids: Vec<i64> =
|
||||||
|
inserted_rows.iter().map(|r| r.user_id).collect();
|
||||||
|
let inserted_users: Vec<DBUserId> = inserted_user_raw_ids
|
||||||
|
.iter()
|
||||||
|
.map(|&id| DBUserId(id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let notification_types = inserted_notification_ids
|
||||||
.iter()
|
.iter()
|
||||||
.map(|_| NotificationType::PayoutAvailable.as_str())
|
.map(|_| NotificationType::PayoutAvailable.as_str())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
@@ -99,10 +119,10 @@ impl NotificationBuilder {
|
|||||||
NotificationBuilder::insert_many_deliveries(
|
NotificationBuilder::insert_many_deliveries(
|
||||||
transaction,
|
transaction,
|
||||||
redis,
|
redis,
|
||||||
¬ification_ids,
|
&inserted_notification_ids,
|
||||||
&users_raw_ids,
|
&inserted_user_raw_ids,
|
||||||
¬ification_types,
|
¬ification_types,
|
||||||
&users,
|
&inserted_users,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|||||||
@@ -1393,3 +1393,179 @@ async fn check_balance_with_webhook(
|
|||||||
|
|
||||||
Ok(result.ok().flatten())
|
Ok(result.ok().flatten())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::test::{
|
||||||
|
api_v3::ApiV3,
|
||||||
|
database::USER_USER_ID_PARSED,
|
||||||
|
environment::{TestEnvironment, with_test_environment},
|
||||||
|
};
|
||||||
|
use rust_decimal::dec;
|
||||||
|
|
||||||
|
async fn setup_payouts_values(
|
||||||
|
db: &PgPool,
|
||||||
|
entries: Vec<(i64, Decimal, DateTime<Utc>)>, // (user_id, amount, date_available)
|
||||||
|
) {
|
||||||
|
for (user_id, amount, date_available) in &entries {
|
||||||
|
sqlx::query!(
|
||||||
|
"INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)
|
||||||
|
VALUES ($1, NULL, $2, NOW(), $3)",
|
||||||
|
user_id,
|
||||||
|
amount,
|
||||||
|
date_available,
|
||||||
|
)
|
||||||
|
.execute(db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (user_id, _amount, date_available) in &entries {
|
||||||
|
sqlx::query!(
|
||||||
|
"INSERT INTO payouts_values_notifications (date_available, user_id, notified)
|
||||||
|
VALUES ($1, $2, FALSE)
|
||||||
|
ON CONFLICT (date_available, user_id) DO NOTHING",
|
||||||
|
date_available,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
|
.execute(db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When a user's payout amount is below
|
||||||
|
/// the $1.00 (100 cents) threshold, the `WHERE sum >= 100` filter in
|
||||||
|
/// insert_many_payout_notifications skips the INSERT into notifications
|
||||||
|
/// for that user, but the old code still passed ALL pre-generated
|
||||||
|
/// notification_ids to insert_many_deliveries. Since
|
||||||
|
/// notifications_deliveries.notification_id has a FK constraint on
|
||||||
|
/// notifications(id), this caused a violation that failed
|
||||||
|
/// the entire transaction. The notified flag was never set to `true`,
|
||||||
|
/// so the rows accumulated and the same failure repeated every run.
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_payout_notification_below_threshold_does_not_fail() {
|
||||||
|
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
|
||||||
|
let db = &env.db.pool;
|
||||||
|
let redis = &env.db.redis_pool;
|
||||||
|
|
||||||
|
let user_id = USER_USER_ID_PARSED;
|
||||||
|
|
||||||
|
// date_available must be in the past so the notification query
|
||||||
|
// picks it up (date_available <= NOW()).
|
||||||
|
let date_available = Utc::now() - Duration::hours(1);
|
||||||
|
|
||||||
|
// Amount of $0.50 -- below the $1.00 threshold (sum < 100 cents).
|
||||||
|
setup_payouts_values(
|
||||||
|
db,
|
||||||
|
vec![(user_id, dec!(0.50), date_available)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// This should succeed, NOT return an error.
|
||||||
|
// Before the fix, this would fail with a FK constraint violation.
|
||||||
|
let result = index_payouts_notifications(db, redis).await;
|
||||||
|
assert!(
|
||||||
|
result.is_ok(),
|
||||||
|
"index_payouts_notifications should succeed for below-threshold payouts, got: {:?}",
|
||||||
|
result.err()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify the notification row was marked as notified (not stuck).
|
||||||
|
let remaining = sqlx::query_scalar!(
|
||||||
|
"SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE AND user_id = $1",
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
|
.fetch_one(db)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
remaining, 0,
|
||||||
|
"All payouts_values_notifications rows should be marked notified"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When there is a mix of users, some above and some below the
|
||||||
|
/// threshold, the above-threshold users should get notifications
|
||||||
|
/// while the below-threshold ones are silently skipped. The entire
|
||||||
|
/// transaction must succeed (not fail due to the below-threshold user).
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_payout_notification_mixed_threshold_users() {
|
||||||
|
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
|
||||||
|
let db = &env.db.pool;
|
||||||
|
let redis = &env.db.redis_pool;
|
||||||
|
|
||||||
|
let above_user_id = USER_USER_ID_PARSED; // user 3
|
||||||
|
let below_user_id = 4i64; // FRIEND_USER_ID
|
||||||
|
let date_available = Utc::now() - Duration::hours(1);
|
||||||
|
|
||||||
|
setup_payouts_values(
|
||||||
|
db,
|
||||||
|
vec![
|
||||||
|
// Above threshold
|
||||||
|
(above_user_id, dec!(5.00), date_available),
|
||||||
|
// Below threshold
|
||||||
|
(below_user_id, dec!(0.25), date_available),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let result = index_payouts_notifications(db, redis).await;
|
||||||
|
assert!(
|
||||||
|
result.is_ok(),
|
||||||
|
"index_payouts_notifications should succeed with mixed users, got: {:?}",
|
||||||
|
result.err()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Above-threshold user should have a notification.
|
||||||
|
let above_count = sqlx::query_scalar!(
|
||||||
|
"SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND body->>'type' = 'payout_available'",
|
||||||
|
above_user_id,
|
||||||
|
)
|
||||||
|
.fetch_one(db)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
above_count > 0,
|
||||||
|
"Above-threshold user should have a payout notification"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Below-threshold user should NOT have a notification.
|
||||||
|
let below_count = sqlx::query_scalar!(
|
||||||
|
"SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND body->>'type' = 'payout_available'",
|
||||||
|
below_user_id,
|
||||||
|
)
|
||||||
|
.fetch_one(db)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
below_count, 0,
|
||||||
|
"Below-threshold user should NOT have a payout notification"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Both should be marked notified.
|
||||||
|
let remaining = sqlx::query_scalar!(
|
||||||
|
"SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE",
|
||||||
|
)
|
||||||
|
.fetch_one(db)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
remaining, 0,
|
||||||
|
"All payouts_values_notifications rows should be marked notified"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user