From 2d7e87a4cbd11282589fd41f071b2fb9cd169302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois-Xavier=20Talbot?= <108630700+fetchfern@users.noreply.github.com> Date: Fri, 23 Jan 2026 18:35:46 -0500 Subject: [PATCH] Local cache lock (#5193) --- apps/labrinth/CLAUDE.md | 3 - apps/labrinth/src/database/models/mod.rs | 4 + apps/labrinth/src/database/redis/mod.rs | 254 +++++++++++------------ apps/labrinth/src/database/redis/util.rs | 86 ++++++++ apps/labrinth/src/test/permissions.rs | 4 +- 5 files changed, 215 insertions(+), 136 deletions(-) delete mode 100644 apps/labrinth/CLAUDE.md diff --git a/apps/labrinth/CLAUDE.md b/apps/labrinth/CLAUDE.md deleted file mode 100644 index 0342153c0..000000000 --- a/apps/labrinth/CLAUDE.md +++ /dev/null @@ -1,3 +0,0 @@ -If an AI agent is reading this file, and does not have any previous context from a higher-level `CLAUDE.md`, inform the developer that they are launching Claude Code from the wrong directory. Their PWD is `$PROJECT_ROOT/apps/labrinth`, however Claude Code must be launched from `$PROJECT_ROOT` to get the full context! - -If the above is true, refuse to answer any prompts unless the developer has launched Claude Code from the project root. diff --git a/apps/labrinth/src/database/models/mod.rs b/apps/labrinth/src/database/models/mod.rs index 0014c58c5..a547ec671 100644 --- a/apps/labrinth/src/database/models/mod.rs +++ b/apps/labrinth/src/database/models/mod.rs @@ -79,4 +79,8 @@ pub enum DatabaseError { time_spent_pool_wait_ms: u64, time_spent_total_ms: u64, }, + #[error( + "Timeout waiting on local cache lock ({released}/{total} released)" + )] + LocalCacheTimeout { released: usize, total: usize }, } diff --git a/apps/labrinth/src/database/redis/mod.rs b/apps/labrinth/src/database/redis/mod.rs index dd4e11e10..545e9be03 100644 --- a/apps/labrinth/src/database/redis/mod.rs +++ b/apps/labrinth/src/database/redis/mod.rs @@ -4,8 +4,9 @@ use chrono::{TimeZone, Utc}; use dashmap::DashMap; use deadpool_redis::{Config, Runtime}; use futures::future::Either; +use futures::stream::{FuturesUnordered, StreamExt}; use prometheus::{IntGauge, Registry}; -use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs}; +use redis::ToRedisArgs; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -13,7 +14,6 @@ use std::fmt::{Debug, Display}; use std::future::Future; use std::hash::Hash; use std::time::Duration; -use std::time::Instant; use tracing::{Instrument, info_span}; use util::{cmd, redis_pipe}; @@ -26,6 +26,7 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes pub struct RedisPool { pub url: String, pub pool: util::InstrumentedPool, + cache_list: DashMap, meta_namespace: String, } @@ -69,6 +70,7 @@ impl RedisPool { let pool = RedisPool { url, pool: util::InstrumentedPool::new(pool), + cache_list: DashMap::with_capacity(2048), meta_namespace: meta_namespace.unwrap_or("".to_string()), }; @@ -370,59 +372,51 @@ impl RedisPool { .collect::>(); let subscribe_ids = DashMap::new(); + let mut cache_writers = HashMap::new(); if !ids.is_empty() { - let mut pipe = redis_pipe(); - let fetch_ids = ids.iter().map(|x| x.key().clone()).collect::>(); - fetch_ids.iter().for_each(|key| { - pipe.atomic().set_options( - // We store locks in lowercase because they are case insensitive - format!( - "{}_{namespace}:{}/lock", - self.meta_namespace, + fetch_ids.into_iter().for_each(|key| { + let namespaced_key = format!( + "{}_{namespace}:{}", + self.meta_namespace, + if case_sensitive { key.to_lowercase() - ), - 100, - SetOptions::default() - .get(true) - .conditional_set(ExistenceCheck::NX) - .with_expiration(SetExpiry::EX(60)), - ); - }); - let results = { - let mut connection = self.pool.get().await?; - - pipe.query_async::>>(&mut connection) - .await? - }; - - for (idx, key) in fetch_ids.into_iter().enumerate() { - if let Some(locked) = results.get(idx) - && locked.is_none() - { - continue; - } - - if let Some((key, raw_key)) = ids.remove(&key) { - if let Some(val) = expired_values.remove(&key) { - if let Some(ref alias) = val.alias { - ids.remove(&alias.to_string()); - } - - if let Ok(value) = val.key.to_string().parse::() { - let base62 = to_base62(value); - ids.remove(&base62); - } - - cached_values.insert(val.key.clone(), val); } else { - subscribe_ids.insert(key, raw_key); + key.clone() + } + ); + let either = self.acquire_lock(namespaced_key); + + match either { + Either::Left(sentinel) => { + cache_writers.insert(key, sentinel); + } + + Either::Right(subscriber) => { + if let Some((key, raw_key)) = ids.remove(&key) { + if let Some(val) = expired_values.remove(&key) { + if let Some(ref alias) = val.alias { + ids.remove(&alias.to_string()); + } + + if let Ok(value) = + val.key.to_string().parse::() + { + let base62 = to_base62(value); + ids.remove(&base62); + } + + cached_values.insert(val.key.clone(), val); + } else { + subscribe_ids.insert(raw_key, subscriber); + } + } } } - } + }); } let mut fetch_tasks = Vec::new(); @@ -436,6 +430,8 @@ impl RedisPool { let mut return_values = HashMap::new(); let mut pipe = redis_pipe(); + // Doesn't need to be atomic + if !vals.is_empty() { for (key, (slug, value)) in vals { let value = RedisValue { @@ -445,7 +441,7 @@ impl RedisPool { alias: slug.clone(), }; - pipe.atomic().set_ex( + pipe.set_ex( format!( "{}_{namespace}:{key}", self.meta_namespace @@ -464,7 +460,7 @@ impl RedisPool { slug.to_string().to_lowercase() }; - pipe.atomic().set_ex( + pipe.set_ex( format!( "{}_{slug_namespace}:{}", self.meta_namespace, actual_slug @@ -473,117 +469,75 @@ impl RedisPool { DEFAULT_EXPIRY as u64, ); - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", - // Locks are stored in lowercase - self.meta_namespace, - actual_slug.to_lowercase() - )); + /* + if let Some(_sentinel) = + cache_writers.remove(&actual_slug) + { + // drop it + } + */ } } let key_str = key.to_string(); ids.remove(&key_str); + /* + if let Some(_sentinel) = cache_writers.remove(&key_str) + { + // drop it + } + */ + if let Ok(value) = key_str.parse::() { let base62 = to_base62(value); ids.remove(&base62); - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", - self.meta_namespace, - // Locks are stored in lowercase - base62.to_lowercase() - )); + /* + if let Some(_sentinel) = + cache_writers.remove(&base62) + { + // drop it + } + */ } - pipe.atomic().del(format!( - "{}_{namespace}:{key}/lock", - self.meta_namespace - )); - return_values.insert(key, value); } } - for (key, _) in ids { - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", - self.meta_namespace, - key.to_lowercase() - )); - pipe.atomic().del(format!( - "{}_{namespace}:{key}/lock", - self.meta_namespace - )); - } - let mut connection = self.pool.get().await?; pipe.query_async::<()>(&mut connection).await?; - Ok(return_values) + drop(cache_writers); + + Result::<_, DatabaseError>::Ok(return_values) })); } if !subscribe_ids.is_empty() { - fetch_tasks.push(Either::Right(async { - let mut wait_time_ms = 50; - let start = Utc::now(); - let mut redis_budget = Duration::ZERO; + fetch_tasks.push(Either::Right(async move { + let mut futures = FuturesUnordered::new(); + let len = subscribe_ids.len(); - loop { - let results = { - let acquire_start = Instant::now(); - let mut connection = self.pool.get().await?; - let args = subscribe_ids - .iter() - .map(|x| { - format!( - "{}_{namespace}:{}/lock", - self.meta_namespace, - // We lowercase key because locks are stored in lowercase - x.key().to_lowercase() - ) - }) - .collect::>(); - redis_budget += acquire_start.elapsed(); - - cmd("MGET") - .arg(&args) - .query_async::>>(&mut connection) - .await? - }; - - let exist_count = - results.into_iter().filter(|x| x.is_some()).count(); - - // None of the locks exist anymore, we can continue - if exist_count == 0 { - break; - } - - let spinning = Utc::now() - start; - if spinning > chrono::Duration::seconds(5) { - return Err(DatabaseError::CacheTimeout { - locks_released: subscribe_ids.len() - exist_count, - locks_waiting: subscribe_ids.len(), - time_spent_pool_wait_ms: redis_budget.as_millis() - as u64, - time_spent_total_ms: spinning - .num_milliseconds() - .max(0) - as u64, - }); - } - - tokio::time::sleep(Duration::from_millis(wait_time_ms)) - .await; - wait_time_ms *= 2; // 50, 100, 200, 400, 800, 1600, 3200 + for (key, subscriber) in subscribe_ids { + futures.push(async move { + ( + key, + subscriber + .wait_timeout(Duration::from_secs(5)) + .await, + ) + }); } - let (return_values, _) = - get_cached_values(subscribe_ids).await?; + let fetch_ids = DashMap::with_capacity(len); + while let Some((key, result)) = futures.next().await { + result?; + fetch_ids.insert(key.to_string(), key); + } + let (return_values, _) = get_cached_values(fetch_ids).await?; Ok(return_values) })); } @@ -598,6 +552,42 @@ impl RedisPool { Ok(cached_values.into_iter().map(|x| (x.0, x.1.val)).collect()) } + + /// Acquire or create a cache lock onto the given key. + fn acquire_lock( + &self, + key: String, + ) -> Either, util::CacheSubscriber> { + let mut out_writer = None; + let subscriber = + self.cache_list.entry(key.clone()).or_insert_with(|| { + let (writer, subscriber) = util::cache(); + out_writer = Some(writer); + subscriber + }); + + match out_writer { + Some(writer) => Either::Left(LockSentinel { + pool: self, + key, + writer, + }), + None => Either::Right(subscriber.clone()), + } + } +} + +struct LockSentinel<'a> { + pool: &'a RedisPool, + key: String, + writer: util::CacheWriter, +} + +impl<'a> Drop for LockSentinel<'a> { + fn drop(&mut self) { + self.writer.write(); + self.pool.cache_list.remove(&self.key); + } } impl RedisConnection { diff --git a/apps/labrinth/src/database/redis/util.rs b/apps/labrinth/src/database/redis/util.rs index 1980bc7b7..7e7ef3be7 100644 --- a/apps/labrinth/src/database/redis/util.rs +++ b/apps/labrinth/src/database/redis/util.rs @@ -1,10 +1,16 @@ use std::fmt::Debug; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use deadpool_redis::PoolError; use derive_more::{Deref, DerefMut}; use redis::{FromRedisValue, RedisResult, ToRedisArgs}; +use tokio::sync::Notify; +use tokio::time::{Duration, timeout}; use tracing::{Instrument, info_span}; +use crate::database::models::DatabaseError; + #[derive(Debug, Clone, Deref, DerefMut)] pub struct InstrumentedPool { inner: deadpool_redis::Pool, @@ -91,3 +97,83 @@ impl InstrumentedCmd { self.inner.query_async(con).instrument(span).await } } + +pub fn cache() -> (CacheWriter, CacheSubscriber) { + let shared = Arc::new(Shared::new()); + ( + CacheWriter { + shared: shared.clone(), + }, + CacheSubscriber { shared }, + ) +} + +pub struct CacheWriter { + shared: Arc, +} + +impl CacheWriter { + pub fn write(&self) { + self.shared.make_ready(); + } +} + +#[derive(Clone)] +pub struct CacheSubscriber { + shared: Arc, +} + +impl CacheSubscriber { + pub async fn wait_timeout( + self, + duration: Duration, + ) -> Result<(), DatabaseError> { + timeout(duration, self.shared.wait()).await.map_err(|_| { + DatabaseError::LocalCacheTimeout { + released: 0, + total: 1, + } + }) + } +} + +struct Shared { + ready: AtomicBool, + // With this implementation's intrusive linked lists, the waiters are stored inline in the future + // so there's no heap allocation per waiter. + wakers: Notify, +} + +impl Shared { + fn new() -> Self { + Self { + ready: AtomicBool::new(false), + wakers: Notify::new(), + } + } + + fn make_ready(&self) { + self.ready.store(true, Ordering::Release); + self.wakers.notify_waiters(); + } + + async fn wait(&self) { + let ready = self.ready.load(Ordering::Acquire); + + if ready { + return; + } + + let notification = self.wakers.notified(); + // Don't need to call `enable` as we use notify_waiters + + // Prevent race where the writer set the ready bit and notified waiters between the load and registering the waiter + let ready = self.ready.load(Ordering::SeqCst); + + if ready { + return; + } + + notification.await; + } +} diff --git a/apps/labrinth/src/test/permissions.rs b/apps/labrinth/src/test/permissions.rs index 651322f76..3a03f6415 100644 --- a/apps/labrinth/src/test/permissions.rs +++ b/apps/labrinth/src/test/permissions.rs @@ -1051,7 +1051,9 @@ async fn create_dummy_org(setup_api: &ApiV3) -> (String, String) { ADMIN_USER_PAT, ) .await; - assert!(resp.status().is_success()); + + println!("response: {:?}", resp.response(),); + assert_eq!(resp.status(), StatusCode::OK); let organization = setup_api .get_organization_deserialized(&slug, ADMIN_USER_PAT)