fix: moderation locking fixes (#5843)

* fix: moderation locking fixes

* fix: lint

* wip: override always available

* fix: newmodal base z

* fix: cargo fmt
This commit is contained in:
Calum H.
2026-04-18 19:55:33 +01:00
committed by GitHub
parent 3a44def301
commit 2236dd8ade
19 changed files with 1630 additions and 251 deletions

View File

@@ -1,10 +1,10 @@
use crate::database::PgPool;
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use crate::database::models::{DBProjectId, DBUserId};
const LOCK_EXPIRY_MINUTES: i64 = 15;
pub const LOCK_EXPIRY_MINUTES: i64 = 15;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DBModerationLock {
@@ -20,69 +20,102 @@ pub struct ModerationLockWithUser {
pub moderator_username: String,
pub moderator_avatar_url: Option<String>,
pub locked_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
pub expired: bool,
}
impl DBModerationLock {
/// Check if a lock is expired (older than 15 minutes)
pub fn is_expired(&self) -> bool {
Utc::now()
.signed_duration_since(self.locked_at)
.num_minutes()
>= LOCK_EXPIRY_MINUTES
}
/// Try to acquire or refresh a lock for a project.
/// Try to acquire or refresh a lock for a project atomically.
/// Returns Ok(Ok(())) if lock acquired/refreshed, Ok(Err(lock)) if blocked by another moderator.
pub async fn acquire(
project_id: DBProjectId,
moderator_id: DBUserId,
pool: &PgPool,
) -> Result<Result<(), ModerationLockWithUser>, sqlx::Error> {
// First check if there's an existing lock
let existing = Self::get_with_user(project_id, pool).await?;
if let Some(lock) = existing {
// Same moderator - refresh the lock
if lock.moderator_id == moderator_id {
sqlx::query!(
"UPDATE moderation_locks SET locked_at = NOW() WHERE project_id = $1",
project_id as DBProjectId
)
.execute(pool)
.await?;
return Ok(Ok(()));
}
// Different moderator but lock expired - take over
if lock.expired {
sqlx::query!(
"UPDATE moderation_locks SET moderator_id = $1, locked_at = NOW() WHERE project_id = $2",
moderator_id as DBUserId,
project_id as DBProjectId
)
.execute(pool)
.await?;
return Ok(Ok(()));
}
// Different moderator, not expired - blocked
return Ok(Err(lock));
}
// No existing lock - create new one
sqlx::query!(
"INSERT INTO moderation_locks (project_id, moderator_id, locked_at)
VALUES ($1, $2, NOW())
ON CONFLICT (project_id) DO UPDATE
SET moderator_id = EXCLUDED.moderator_id, locked_at = EXCLUDED.locked_at",
// Atomic upsert that always returns the post-operation row. When the lock is held by
// another moderator and is still valid, the CASE branches write the existing values
// back (a harmless self-update), so `RETURNING` always yields a row describing the
// current holder. We cannot rely on a bare `DO UPDATE ... WHERE` because:
// * `WHERE` that evaluates false suppresses the update *and* `RETURNING`, and
// * data-modifying CTEs share a snapshot with the enclosing SELECT, so a plain
// `SELECT ... FROM moderation_locks` in the same statement cannot see a row
// inserted by the CTE above it.
let row = sqlx::query!(
r#"
WITH upsert AS (
INSERT INTO moderation_locks (project_id, moderator_id, locked_at)
VALUES ($1, $2, NOW())
ON CONFLICT (project_id) DO UPDATE SET
moderator_id = CASE
WHEN moderation_locks.moderator_id = EXCLUDED.moderator_id
OR moderation_locks.locked_at < NOW() - ($3::bigint * INTERVAL '1 minute')
THEN EXCLUDED.moderator_id
ELSE moderation_locks.moderator_id
END,
locked_at = CASE
WHEN moderation_locks.moderator_id = EXCLUDED.moderator_id
OR moderation_locks.locked_at < NOW() - ($3::bigint * INTERVAL '1 minute')
THEN EXCLUDED.locked_at
ELSE moderation_locks.locked_at
END
RETURNING moderator_id, locked_at
)
SELECT
upsert.moderator_id,
upsert.locked_at,
u.username AS moderator_username,
u.avatar_url AS moderator_avatar_url
FROM upsert
INNER JOIN users u ON u.id = upsert.moderator_id
"#,
project_id as DBProjectId,
moderator_id as DBUserId
moderator_id as DBUserId,
LOCK_EXPIRY_MINUTES,
)
.fetch_one(pool)
.await?;
let locked_at: DateTime<Utc> = row.locked_at;
let expires_at = locked_at + Duration::minutes(LOCK_EXPIRY_MINUTES);
let expired = Utc::now() >= expires_at;
if row.moderator_id == moderator_id.0 {
Ok(Ok(()))
} else {
Ok(Err(ModerationLockWithUser {
project_id,
moderator_id: DBUserId(row.moderator_id),
moderator_username: row.moderator_username,
moderator_avatar_url: row.moderator_avatar_url,
locked_at,
expires_at,
expired,
}))
}
}
/// Reassign the lock to `moderator_id`, even when another moderator holds an active lock.
/// Used only after explicit client confirmation (override flow).
pub async fn force_acquire(
project_id: DBProjectId,
moderator_id: DBUserId,
pool: &PgPool,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO moderation_locks (project_id, moderator_id, locked_at)
VALUES ($1, $2, NOW())
ON CONFLICT (project_id) DO UPDATE SET
moderator_id = EXCLUDED.moderator_id,
locked_at = EXCLUDED.locked_at
"#,
)
.bind(project_id)
.bind(moderator_id)
.execute(pool)
.await?;
Ok(Ok(()))
Ok(())
}
/// Get lock status for a project, including moderator username
@@ -109,9 +142,8 @@ impl DBModerationLock {
Ok(row.map(|r| {
let locked_at: DateTime<Utc> = r.locked_at;
let expired =
Utc::now().signed_duration_since(locked_at).num_minutes()
>= LOCK_EXPIRY_MINUTES;
let expires_at = locked_at + Duration::minutes(LOCK_EXPIRY_MINUTES);
let expired = Utc::now() >= expires_at;
ModerationLockWithUser {
project_id: DBProjectId(r.project_id),
@@ -119,6 +151,7 @@ impl DBModerationLock {
moderator_username: r.moderator_username,
moderator_avatar_url: r.moderator_avatar_url,
locked_at,
expires_at,
expired,
}
}))
@@ -144,10 +177,11 @@ impl DBModerationLock {
/// Clean up expired locks (can be called periodically)
pub async fn cleanup_expired(pool: &PgPool) -> Result<u64, sqlx::Error> {
let result = sqlx::query!(
"DELETE FROM moderation_locks WHERE locked_at < NOW() - INTERVAL '15 minutes'"
)
.execute(pool)
.await?;
"DELETE FROM moderation_locks WHERE locked_at < NOW() - ($1::bigint * INTERVAL '1 minute')",
LOCK_EXPIRY_MINUTES,
)
.execute(pool)
.await?;
Ok(result.rows_affected())
}