Use deadpool fork with tracing (#5276)
* Use deadpool fork with tracing * Implement minimum Redis connections * fix typos maybe * address pr comments
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -2247,30 +2247,28 @@ checksum = "be1e0bca6c3637f992fc1cc7cbc52a78c1ef6db076dbf1059c4323d6a2048376"
|
||||
[[package]]
|
||||
name = "deadpool"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b"
|
||||
source = "git+https://github.com/modrinth/deadpool?rev=db5fb00b036ecc8fe5f18853c559b745ffe47bde#db5fb00b036ecc8fe5f18853c559b745ffe47bde"
|
||||
dependencies = [
|
||||
"deadpool-runtime",
|
||||
"lazy_static",
|
||||
"num_cpus",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-redis"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0965b977f1244bc3783bb27cd79cfcff335a8341da18f79232d00504b18eb1a"
|
||||
version = "0.22.1"
|
||||
source = "git+https://github.com/modrinth/deadpool?rev=db5fb00b036ecc8fe5f18853c559b745ffe47bde#db5fb00b036ecc8fe5f18853c559b745ffe47bde"
|
||||
dependencies = [
|
||||
"deadpool",
|
||||
"redis",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-runtime"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
|
||||
version = "0.1.5"
|
||||
source = "git+https://github.com/modrinth/deadpool?rev=db5fb00b036ecc8fe5f18853c559b745ffe47bde#db5fb00b036ecc8fe5f18853c559b745ffe47bde"
|
||||
dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -60,7 +60,7 @@ const_format = "0.2.34"
|
||||
daedalus = { path = "packages/daedalus" }
|
||||
dashmap = "6.1.0"
|
||||
data-url = "0.3.2"
|
||||
deadpool-redis = "0.22.0"
|
||||
deadpool-redis = { version = "0.22.1", git = "https://github.com/modrinth/deadpool", rev = "db5fb00b036ecc8fe5f18853c559b745ffe47bde" }
|
||||
derive_more = "2.0.1"
|
||||
directories = "6.0.0"
|
||||
dirs = "6.0.0"
|
||||
|
||||
@@ -17,3 +17,7 @@ extend-exclude = [
|
||||
tou = "tou"
|
||||
# Google Ad Manager
|
||||
gam = "gam"
|
||||
# short for "constants"
|
||||
consts = "consts"
|
||||
# short for "Copy"
|
||||
Cpy = "Cpy"
|
||||
|
||||
@@ -19,6 +19,7 @@ MEILISEARCH_WRITE_ADDRS=http://localhost:7700
|
||||
MEILISEARCH_KEY=modrinth
|
||||
|
||||
REDIS_URL=redis://labrinth-redis
|
||||
REDIS_MIN_CONNECTIONS=0
|
||||
REDIS_MAX_CONNECTIONS=10000
|
||||
|
||||
BIND_ADDR=0.0.0.0:8000
|
||||
|
||||
@@ -27,6 +27,7 @@ SEARCH_OPERATION_TIMEOUT=300000
|
||||
MEILISEARCH_KEY=modrinth
|
||||
|
||||
REDIS_URL=redis://localhost
|
||||
REDIS_MIN_CONNECTIONS=0
|
||||
REDIS_MAX_CONNECTIONS=10000
|
||||
|
||||
# Must bind to broadcast, not localhost, because some
|
||||
|
||||
@@ -3,6 +3,7 @@ use ariadne::ids::base62_impl::{parse_base62, to_base62};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use dashmap::DashMap;
|
||||
use deadpool_redis::{Config, Runtime};
|
||||
use futures::TryStreamExt;
|
||||
use futures::future::Either;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use prometheus::{IntGauge, Registry};
|
||||
@@ -14,7 +15,7 @@ use std::fmt::{Debug, Display};
|
||||
use std::future::Future;
|
||||
use std::hash::Hash;
|
||||
use std::time::Duration;
|
||||
use tracing::{Instrument, info_span};
|
||||
use tracing::{Instrument, info, info_span};
|
||||
use util::{cmd, redis_pipe};
|
||||
|
||||
pub mod util;
|
||||
@@ -25,7 +26,7 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes
|
||||
#[derive(Clone)]
|
||||
pub struct RedisPool {
|
||||
pub url: String,
|
||||
pub pool: util::InstrumentedPool,
|
||||
pub pool: deadpool_redis::Pool,
|
||||
cache_list: DashMap<String, util::CacheSubscriber>,
|
||||
meta_namespace: String,
|
||||
}
|
||||
@@ -69,11 +70,35 @@ impl RedisPool {
|
||||
|
||||
let pool = RedisPool {
|
||||
url,
|
||||
pool: util::InstrumentedPool::new(pool),
|
||||
pool,
|
||||
cache_list: DashMap::with_capacity(2048),
|
||||
meta_namespace: meta_namespace.unwrap_or("".to_string()),
|
||||
};
|
||||
|
||||
let redis_min_connections = dotenvy::var("REDIS_MIN_CONNECTIONS")
|
||||
.ok()
|
||||
.and_then(|x| x.parse::<usize>().ok())
|
||||
.unwrap_or(0);
|
||||
let spawn_min_connections = (0..redis_min_connections)
|
||||
.map(|_| {
|
||||
let pool = pool.clone();
|
||||
tokio::spawn(async move { pool.pool.get().await })
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
tokio::spawn({
|
||||
let pool = pool.clone();
|
||||
async move {
|
||||
// collect the connections into a buffer while we're spawning them,
|
||||
// to make sure that we're not `get`ing any connections we previously took
|
||||
let _connections =
|
||||
spawn_min_connections.try_collect::<Vec<_>>().await;
|
||||
info!(
|
||||
pool_status = ?pool.pool.status(),
|
||||
"Finished getting {redis_min_connections} initial Redis connections"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let interval = Duration::from_secs(30);
|
||||
let max_age = Duration::from_secs(5 * 60); // 5 minutes
|
||||
let pool_ref = pool.clone();
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use deadpool_redis::PoolError;
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use redis::{FromRedisValue, RedisResult, ToRedisArgs};
|
||||
use tokio::sync::Notify;
|
||||
@@ -11,24 +10,6 @@ use tracing::{Instrument, info_span};
|
||||
|
||||
use crate::database::models::DatabaseError;
|
||||
|
||||
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||
pub struct InstrumentedPool {
|
||||
inner: deadpool_redis::Pool,
|
||||
}
|
||||
|
||||
impl InstrumentedPool {
|
||||
pub fn new(inner: deadpool_redis::Pool) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
pub async fn get(&self) -> Result<deadpool_redis::Connection, PoolError> {
|
||||
self.inner
|
||||
.get()
|
||||
.instrument(info_span!("get redis connection"))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn redis_pipe() -> InstrumentedPipeline {
|
||||
InstrumentedPipeline {
|
||||
inner: redis::pipe(),
|
||||
@@ -55,7 +36,7 @@ impl InstrumentedPipeline {
|
||||
) -> RedisResult<T> {
|
||||
self.inner
|
||||
.query_async(con)
|
||||
.instrument(info_span!("execute redis pipeline"))
|
||||
.instrument(info_span!("pipeline.query_async"))
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -88,7 +69,7 @@ impl InstrumentedCmd {
|
||||
con: &mut impl redis::aio::ConnectionLike,
|
||||
) -> RedisResult<T> {
|
||||
let span = info_span!(
|
||||
"query_async",
|
||||
"cmd.query_async",
|
||||
// <https://opentelemetry.io/docs/specs/semconv/db/redis/>
|
||||
db.system.name = "redis",
|
||||
db.operation.name = self.name,
|
||||
|
||||
Reference in New Issue
Block a user