From 678f8049e3f2791b10d123ffa987e40573c8e169 Mon Sep 17 00:00:00 2001 From: "Michael H." Date: Sun, 3 May 2026 20:01:56 +0200 Subject: [PATCH] fix: labrinth memory leaks (#5980) --- apps/labrinth/src/database/redis/mod.rs | 98 ++++++++++++++----------- apps/labrinth/src/util/sentry.rs | 17 +++-- 2 files changed, 63 insertions(+), 52 deletions(-) diff --git a/apps/labrinth/src/database/redis/mod.rs b/apps/labrinth/src/database/redis/mod.rs index 51e94cfb1..022da0083 100644 --- a/apps/labrinth/src/database/redis/mod.rs +++ b/apps/labrinth/src/database/redis/mod.rs @@ -26,6 +26,19 @@ pub mod util; const DEFAULT_EXPIRY: i64 = 60 * 60 * 12; // 12 hours const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes +// Bound how many commands we send in a single Redis pipeline. The multiplexed +// connection's BytesMut write buffer keeps its peak capacity for the life of +// the connection, so larger pipelines cause higher steady-state RSS. +const PIPELINE_CHUNK_SIZE: usize = 25; +// Bound how many keys we send in a single MGET. Each MGET response must fit +// into the connection's read buffer, which also retains its peak capacity. At +// ~1 MB per cached value, 32 keys caps any single response at ~32 MB. +const MGET_CHUNK_SIZE: usize = 32; +// How long a pooled Redis connection lives before being recycled, regardless +// of activity. Forced recycling is the only way to release the per-connection +// BytesMut peak capacity that builds up under steady load. +const REDIS_MAX_CONN_AGE: Duration = Duration::from_secs(120); + #[derive(Clone)] pub struct RedisPool { pub url: String, @@ -85,14 +98,19 @@ impl RedisPool { }); let interval = Duration::from_secs(30); - let max_age = Duration::from_secs(5 * 60); // 5 minutes + let max_idle = Duration::from_secs(5 * 60); // 5 minutes let pool_ref = pool.clone(); tokio::spawn(async move { loop { tokio::time::sleep(interval).await; - pool_ref - .pool - .retain(|_, metrics| metrics.last_used() < max_age); + pool_ref.pool.retain(|_, metrics| { + // Drop connections that have been idle too long, OR that + // are older than REDIS_MAX_CONN_AGE regardless of use. + // The age-based recycle is what releases the per-connection + // BytesMut peak capacity under steady traffic. + metrics.last_used() < max_idle + && metrics.created.elapsed() < REDIS_MAX_CONN_AGE + }); } }); @@ -303,13 +321,16 @@ impl RedisPool { }) .collect::>(); - let v = cmd("MGET") - .arg(&args) - .query_async::>>(&mut connection) - .await? - .into_iter() - .flatten() - .collect::>(); + let mut v = Vec::new(); + for chunk in args.chunks(MGET_CHUNK_SIZE) { + let part = cmd("MGET") + .arg(chunk) + .query_async::>>( + &mut connection, + ) + .await?; + v.extend(part.into_iter().flatten()); + } Ok::<_, DatabaseError>(v) } .instrument(info_span!("get slug ids")) @@ -331,19 +352,20 @@ impl RedisPool { .map(|x| format!("{}_{namespace}:{x}", self.meta_namespace)) .collect::>(); - let cached_values = cmd("MGET") - .arg(&args) - .query_async::>>(&mut connection) - .await? - .into_iter() - .filter_map(|x| { + let mut cached_values = HashMap::new(); + for chunk in args.chunks(MGET_CHUNK_SIZE) { + let part = cmd("MGET") + .arg(chunk) + .query_async::>>(&mut connection) + .await?; + cached_values.extend(part.into_iter().filter_map(|x| { x.and_then(|val| { serde_json::from_str::>(&val) .ok() }) .map(|val| (val.key.clone(), val)) - }) - .collect::>(); + })); + } Ok::<_, DatabaseError>((cached_values, ids)) } @@ -440,6 +462,8 @@ impl RedisPool { let mut return_values = HashMap::new(); let mut pipe = redis_pipe(); + let mut pipe_cmds: usize = 0; + let mut connection = self.pool.get().await?; // Doesn't need to be atomic if !vals.is_empty() { @@ -459,6 +483,7 @@ impl RedisPool { serde_json::to_string(&value)?, DEFAULT_EXPIRY as u64, ); + pipe_cmds += 1; if let Some(slug) = slug { ids.remove(&slug.to_string()); @@ -478,46 +503,31 @@ impl RedisPool { key.to_string(), DEFAULT_EXPIRY as u64, ); - - /* - if let Some(_sentinel) = - cache_writers.remove(&actual_slug) - { - // drop it - } - */ + pipe_cmds += 1; } } let key_str = key.to_string(); ids.remove(&key_str); - /* - if let Some(_sentinel) = cache_writers.remove(&key_str) - { - // drop it - } - */ - if let Ok(value) = key_str.parse::() { let base62 = to_base62(value); ids.remove(&base62); - - /* - if let Some(_sentinel) = - cache_writers.remove(&base62) - { - // drop it - } - */ } return_values.insert(key, value); + + if pipe_cmds >= PIPELINE_CHUNK_SIZE { + pipe.query_async::<()>(&mut connection).await?; + pipe = redis_pipe(); + pipe_cmds = 0; + } } } - let mut connection = self.pool.get().await?; - pipe.query_async::<()>(&mut connection).await?; + if pipe_cmds > 0 { + pipe.query_async::<()>(&mut connection).await?; + } drop(cache_writers); diff --git a/apps/labrinth/src/util/sentry.rs b/apps/labrinth/src/util/sentry.rs index dfd233bd5..e0e89a0cf 100644 --- a/apps/labrinth/src/util/sentry.rs +++ b/apps/labrinth/src/util/sentry.rs @@ -3,7 +3,7 @@ // // TODO: PR something into sentry_actix to let us customize this -use std::{borrow::Cow, pin::Pin, rc::Rc}; +use std::{borrow::Cow, pin::Pin, rc::Rc, sync::Arc}; use actix_http::{ StatusCode, @@ -83,7 +83,11 @@ where } fn call(&self, req: ServiceRequest) -> Self::Future { - let hub = Hub::current(); + // Fork a Hub per request so the scope mutations below (event processor + // capturing the request, span attachment) live only for this request + // and are dropped when the future completes. Mutating the shared + // thread-local hub instead would leak one event processor per request. + let hub = Arc::new(Hub::new_from_top(Hub::main())); let client = hub.client(); let max_request_body_size = client @@ -110,7 +114,6 @@ where ); let transaction = hub.start_transaction(ctx); - transaction.set_request(sentry_req.clone()); transaction.set_origin("auto.http.actix"); transaction }; @@ -127,13 +130,13 @@ where sentry_req.data = Some(capture_request_body(&mut req).await); } - let parent_span = hub.configure_scope(|scope| { - let parent_span = scope.get_span(); + transaction.set_request(sentry_req.clone()); + + hub.configure_scope(|scope| { scope.set_span(Some(transaction.clone().into())); scope.add_event_processor(move |event| { Some(process_event(event, &sentry_req)) }); - parent_span }); let fut = @@ -150,7 +153,6 @@ where transaction.set_status(status); } transaction.finish(); - hub.configure_scope(|scope| scope.set_span(parent_span)); return Err(actix_err); } }; @@ -167,7 +169,6 @@ where transaction.set_status(status); } transaction.finish(); - hub.configure_scope(|scope| scope.set_span(parent_span)); Ok(res) }