Add more Prometheus metrics for memory and Tokio tasks (#5282)
* Add more Prometheus metrics for memory and Tokio tasks * pr comments
This commit is contained in:
@@ -325,16 +325,6 @@ pub fn app_config(
|
||||
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
|
||||
.app_data(web::Data::new(labrinth_config.anrok_client.clone()))
|
||||
.app_data(labrinth_config.rate_limiter.clone())
|
||||
.configure({
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
|cfg| routes::debug::config(cfg)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
|_cfg| ()
|
||||
}
|
||||
})
|
||||
.configure(routes::v2::config)
|
||||
.configure(routes::v3::config)
|
||||
.configure(routes::internal::config)
|
||||
@@ -346,8 +336,18 @@ pub fn utoipa_app_config(
|
||||
cfg: &mut utoipa_actix_web::service_config::ServiceConfig,
|
||||
_labrinth_config: LabrinthConfig,
|
||||
) {
|
||||
cfg.configure(routes::v3::utoipa_config)
|
||||
.configure(routes::internal::utoipa_config);
|
||||
cfg.configure({
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
|cfg| routes::debug::config(cfg)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
|_cfg| ()
|
||||
}
|
||||
})
|
||||
.configure(routes::v3::utoipa_config)
|
||||
.configure(routes::internal::utoipa_config);
|
||||
}
|
||||
|
||||
// This is so that env vars not used immediately don't panic at runtime
|
||||
|
||||
@@ -206,9 +206,8 @@ async fn app() -> std::io::Result<()> {
|
||||
.await
|
||||
.expect("Failed to register redis metrics");
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
labrinth::routes::debug::jemalloc_memory_stats(&prometheus.registry)
|
||||
.expect("Failed to register jemalloc metrics");
|
||||
labrinth::routes::debug::register_and_set_metrics(&prometheus.registry)
|
||||
.expect("Failed to register debug metrics");
|
||||
|
||||
let labrinth_config = labrinth::app_setup(
|
||||
pool.clone(),
|
||||
|
||||
@@ -1,83 +1,71 @@
|
||||
use crate::routes::ApiError;
|
||||
use crate::util::cors::default_cors;
|
||||
use crate::util::guards::admin_key_guard;
|
||||
use actix_web::{HttpResponse, get};
|
||||
use prometheus::{IntGauge, Registry};
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn config(cfg: &mut actix_web::web::ServiceConfig) {
|
||||
use eyre::Context;
|
||||
use eyre::eyre;
|
||||
use prometheus::IntGauge;
|
||||
|
||||
use crate::util::cors::default_cors;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
mod pprof;
|
||||
|
||||
pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) {
|
||||
cfg.service(
|
||||
actix_web::web::scope("/debug")
|
||||
utoipa_actix_web::scope("/debug")
|
||||
.wrap(default_cors())
|
||||
.service(heap)
|
||||
.service(flame_graph),
|
||||
.configure({
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
pprof::config
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
|_cfg| ()
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
#[get("pprof/heap", guard = "admin_key_guard")]
|
||||
pub async fn heap() -> Result<HttpResponse, ApiError> {
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
|
||||
require_profiling_activated(&prof_ctl)?;
|
||||
let pprof = prof_ctl
|
||||
.dump_pprof()
|
||||
.map_err(|err| ApiError::InvalidInput(err.to_string()))?;
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.content_type("application/octet-stream")
|
||||
.body(pprof))
|
||||
}
|
||||
|
||||
#[get("pprof/heap/flamegraph", guard = "admin_key_guard")]
|
||||
pub async fn flame_graph() -> Result<HttpResponse, ApiError> {
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
|
||||
require_profiling_activated(&prof_ctl)?;
|
||||
let svg = prof_ctl
|
||||
.dump_flamegraph()
|
||||
.map_err(|err| ApiError::InvalidInput(err.to_string()))?;
|
||||
|
||||
Ok(HttpResponse::Ok().content_type("image/svg+xml").body(svg))
|
||||
}
|
||||
|
||||
fn require_profiling_activated(
|
||||
prof_ctl: &jemalloc_pprof::JemallocProfCtl,
|
||||
) -> Result<(), ApiError> {
|
||||
if prof_ctl.activated() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::InvalidInput(
|
||||
"Profiling is not activated".to_string(),
|
||||
))
|
||||
pub fn register_and_set_metrics(
|
||||
registry: &prometheus::Registry,
|
||||
) -> eyre::Result<()> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
pprof::register_and_set_metrics(registry)
|
||||
.wrap_err("failed to register jemalloc metrics")?;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn jemalloc_memory_stats(
|
||||
registry: &Registry,
|
||||
) -> Result<(), prometheus::Error> {
|
||||
let allocated_mem = IntGauge::new(
|
||||
"labrinth_memory_allocated",
|
||||
"labrinth allocated memory",
|
||||
let make_gauge = |key: &str, name: &str| {
|
||||
IntGauge::new(key, name)
|
||||
.wrap_err_with(|| eyre!("failed to create gauge for '{key}'"))
|
||||
};
|
||||
|
||||
let num_workers = make_gauge(
|
||||
"labrinth_tokio_num_workers",
|
||||
"number of Tokio worker threads, excluding Actix HTTP server threads",
|
||||
)?;
|
||||
let num_alive_tasks = make_gauge(
|
||||
"labrinth_tokio_num_alive_tasks",
|
||||
"number of alive Tokio tasks, excluding Actix HTTP server tasks",
|
||||
)?;
|
||||
let global_queue_depth = make_gauge(
|
||||
"labrinth_tokio_global_queue_depth",
|
||||
"number of tasks in the global queue, excluding Actix runtime",
|
||||
)?;
|
||||
let resident_mem =
|
||||
IntGauge::new("labrinth_resident_memory", "labrinth resident memory")?;
|
||||
|
||||
registry.register(Box::new(allocated_mem.clone()))?;
|
||||
registry.register(Box::new(resident_mem.clone()))?;
|
||||
for gauge in [&num_workers, &num_alive_tasks, &global_queue_depth] {
|
||||
registry
|
||||
.register(Box::new(gauge.clone()))
|
||||
.wrap_err("failed to register gauge")?;
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let e = tikv_jemalloc_ctl::epoch::mib().unwrap();
|
||||
let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap();
|
||||
let resident = tikv_jemalloc_ctl::stats::resident::mib().unwrap();
|
||||
let metrics = tokio::runtime::Handle::current().metrics();
|
||||
|
||||
loop {
|
||||
e.advance().unwrap();
|
||||
|
||||
if let Ok(allocated) = allocated.read() {
|
||||
allocated_mem.set(allocated as i64);
|
||||
}
|
||||
|
||||
if let Ok(resident) = resident.read() {
|
||||
resident_mem.set(resident as i64);
|
||||
}
|
||||
num_workers.set(metrics.num_workers() as i64);
|
||||
num_alive_tasks.set(metrics.num_alive_tasks() as i64);
|
||||
global_queue_depth.set(metrics.global_queue_depth() as i64);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
102
apps/labrinth/src/routes/debug/pprof.rs
Normal file
102
apps/labrinth/src/routes/debug/pprof.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use crate::routes::ApiError;
|
||||
use crate::util::guards::admin_key_guard;
|
||||
use actix_web::{HttpResponse, get};
|
||||
use eyre::{Context, eyre};
|
||||
use prometheus::{IntGauge, Registry};
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) {
|
||||
cfg.service(heap).service(flame_graph);
|
||||
}
|
||||
|
||||
#[utoipa::path]
|
||||
#[get("/pprof/heap", guard = "admin_key_guard")]
|
||||
pub async fn heap() -> Result<HttpResponse, ApiError> {
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
|
||||
require_profiling_activated(&prof_ctl)?;
|
||||
let pprof = prof_ctl
|
||||
.dump_pprof()
|
||||
.map_err(|err| ApiError::InvalidInput(err.to_string()))?;
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.content_type("application/octet-stream")
|
||||
.body(pprof))
|
||||
}
|
||||
|
||||
#[utoipa::path]
|
||||
#[get("/pprof/heap/flamegraph", guard = "admin_key_guard")]
|
||||
pub async fn flame_graph() -> Result<HttpResponse, ApiError> {
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
|
||||
require_profiling_activated(&prof_ctl)?;
|
||||
let svg = prof_ctl
|
||||
.dump_flamegraph()
|
||||
.map_err(|err| ApiError::InvalidInput(err.to_string()))?;
|
||||
|
||||
Ok(HttpResponse::Ok().content_type("image/svg+xml").body(svg))
|
||||
}
|
||||
|
||||
fn require_profiling_activated(
|
||||
prof_ctl: &jemalloc_pprof::JemallocProfCtl,
|
||||
) -> Result<(), ApiError> {
|
||||
if prof_ctl.activated() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::InvalidInput(
|
||||
"Profiling is not activated".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_and_set_metrics(registry: &Registry) -> eyre::Result<()> {
|
||||
let make_gauge = |key: &str, name: &str| {
|
||||
IntGauge::new(key, name)
|
||||
.wrap_err_with(|| eyre!("failed to create gauge for '{key}'"))
|
||||
};
|
||||
|
||||
let active_mem =
|
||||
make_gauge("labrinth_memory_active", "labrinth active memory")?;
|
||||
let allocated_mem =
|
||||
make_gauge("labrinth_memory_allocated", "labrinth allocated memory")?;
|
||||
let mapped_mem =
|
||||
make_gauge("labrinth_memory_mapped", "labrinth mapped memory")?;
|
||||
let metadata_mem =
|
||||
make_gauge("labrinth_memory_metadata", "labrinth metadata memory")?;
|
||||
let resident_mem =
|
||||
make_gauge("labrinth_memory_resident", "labrinth resident memory")?;
|
||||
|
||||
for gauge in [
|
||||
&active_mem,
|
||||
&allocated_mem,
|
||||
&mapped_mem,
|
||||
&metadata_mem,
|
||||
&resident_mem,
|
||||
] {
|
||||
registry
|
||||
.register(Box::new(gauge.clone()))
|
||||
.wrap_err("failed to register gauge")?;
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let epoch =
|
||||
tikv_jemalloc_ctl::epoch::mib().expect("failed to get epoch");
|
||||
let active = tikv_jemalloc_ctl::stats::active::mib().unwrap();
|
||||
let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap();
|
||||
let mapped = tikv_jemalloc_ctl::stats::mapped::mib().unwrap();
|
||||
let metadata = tikv_jemalloc_ctl::stats::metadata::mib().unwrap();
|
||||
let resident = tikv_jemalloc_ctl::stats::resident::mib().unwrap();
|
||||
|
||||
loop {
|
||||
epoch.advance().unwrap();
|
||||
|
||||
_ = active.read().inspect(|x| active_mem.set(*x as i64));
|
||||
_ = allocated.read().inspect(|x| allocated_mem.set(*x as i64));
|
||||
_ = mapped.read().inspect(|x| mapped_mem.set(*x as i64));
|
||||
_ = metadata.read().inspect(|x| metadata_mem.set(*x as i64));
|
||||
_ = resident.read().inspect(|x| resident_mem.set(*x as i64));
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -10,14 +10,11 @@ use actix_web::{HttpResponse, web};
|
||||
use futures::FutureExt;
|
||||
use serde_json::json;
|
||||
|
||||
pub mod debug;
|
||||
pub mod internal;
|
||||
pub mod v2;
|
||||
pub mod v3;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod debug;
|
||||
|
||||
pub mod v2_reroute;
|
||||
pub mod v3;
|
||||
|
||||
mod analytics;
|
||||
mod index;
|
||||
|
||||
Reference in New Issue
Block a user