Files
Modrinth-plus/apps/labrinth/src/lib.rs
aecsocket f0224dfff7 Search backend refactor with typesense impl (#5528)
* initial elasticsearch impl

* working elastic cluster

* replace SearchError with ApiError for preparation of search backend

* start factoring meili out to trait

* move meili to backend

* update routes to use search backend trait

* wip

* Update projects.rs

* search backend is only init'd once in config

* wip

* wip: backend agnostic

* change search internal routes to delegate to backend

* initial elasticsearch impl

* fix filtering

* elastic impl

* refactor indexing into its own module

* clean up elastic code

* fix ci

* fix tests

* fix elastic health check

* fix up env rebase

* fix compile

* dummy commit to update github pr

* Fix rebase

* Elastic basic https auth

* Fix duplicate projects showing up

* Fix up tests

* Replace search `ApiErrors` with `eyre::Reports`, propagate background task errors

* clean up agents files

* make index chunk size configurable

* make `match_phrase` in elastic case-insensitive

* use current/next indices and swap between them

* test case for error body

* Fix failing case

* da merge

* factor out common stuff from search backends

* allow fetching hit metadata from search results

* allow customising elasticsearch search config

* bit of docs

* add mappings to indices for elastic

* Implement Typesense

* wip

* fix up some sort fields stuff

* use different approach to filterable field sets

* remove a bunch of search fields which weren't used for filtering

* bucket text matches

* Bucketing by text_match for typesense

* fix tombi lint

* fix some sentry errors and dont prioritise 2+ term matches

* tweak ts query settings

* expose some more search settings

* query sort changes

* small fixes

* should fix pagination stuff

* fix healthcheck maybe

* ragebait ci

* tests

* tests

* revert environment
2026-03-12 18:58:55 +01:00

380 lines
13 KiB
Rust

use std::sync::Arc;
use std::time::Duration;
use actix_web::web;
use database::redis::RedisPool;
use queue::{
analytics::AnalyticsQueue, email::EmailQueue, payouts::PayoutsQueue,
session::AuthQueue, socket::ActiveSockets,
};
use tracing::{debug, info, warn};
extern crate clickhouse as clickhouse_crate;
use clickhouse_crate::Client;
use util::cors::default_cors;
use util::gotenberg::GotenbergClient;
use crate::background_task::update_versions;
use crate::database::{PgPool, ReadOnlyPgPool};
use crate::env::ENV;
use crate::queue::billing::{index_billing, index_subscriptions};
use crate::queue::moderation::AutomatedModerationQueue;
use crate::routes::internal::delphi::rescan::rescan_projects_in_queue;
use crate::util::anrok;
use crate::util::archon::ArchonClient;
use crate::util::http::HttpClient;
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
use sync::friends::handle_pubsub;
pub mod auth;
pub mod background_task;
pub mod clickhouse;
pub mod database;
pub mod env;
pub mod file_hosting;
pub mod models;
pub mod queue;
pub mod routes;
pub mod scheduler;
pub mod search;
pub mod sync;
pub mod util;
pub mod validate;
#[cfg(feature = "test")]
pub mod test;
#[derive(Clone)]
pub struct Pepper {
pub pepper: String,
}
#[derive(Clone)]
pub struct LabrinthConfig {
pub pool: PgPool,
pub ro_pool: ReadOnlyPgPool,
pub redis_pool: RedisPool,
pub clickhouse: Client,
pub file_host: Arc<dyn file_hosting::FileHost + Send + Sync>,
pub scheduler: Arc<scheduler::Scheduler>,
pub ip_salt: Pepper,
pub search_backend: web::Data<dyn search::SearchBackend>,
pub session_queue: web::Data<AuthQueue>,
pub payouts_queue: web::Data<PayoutsQueue>,
pub analytics_queue: Arc<AnalyticsQueue>,
pub active_sockets: web::Data<ActiveSockets>,
pub automated_moderation_queue: web::Data<AutomatedModerationQueue>,
pub rate_limiter: web::Data<AsyncRateLimiter>,
pub stripe_client: stripe::Client,
pub anrok_client: anrok::Client,
pub email_queue: web::Data<EmailQueue>,
pub archon_client: web::Data<ArchonClient>,
pub gotenberg_client: GotenbergClient,
pub http_client: web::Data<HttpClient>,
}
#[allow(clippy::too_many_arguments)]
pub fn app_setup(
pool: PgPool,
ro_pool: ReadOnlyPgPool,
redis_pool: RedisPool,
search_backend: actix_web::web::Data<dyn search::SearchBackend>,
clickhouse: &mut Client,
file_host: Arc<dyn file_hosting::FileHost + Send + Sync>,
stripe_client: stripe::Client,
anrok_client: anrok::Client,
email_queue: EmailQueue,
gotenberg_client: GotenbergClient,
enable_background_tasks: bool,
) -> LabrinthConfig {
info!("Starting labrinth on {}", &ENV.BIND_ADDR);
let automated_moderation_queue =
web::Data::new(AutomatedModerationQueue::default());
{
let automated_moderation_queue_ref = automated_moderation_queue.clone();
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
actix_rt::spawn(async move {
automated_moderation_queue_ref
.task(pool_ref, redis_pool_ref)
.await;
});
}
let scheduler = scheduler::Scheduler::new();
let http_client = web::Data::new(HttpClient::new());
{
let pool_ref = pool.clone();
let http_ref = http_client.clone();
actix_rt::spawn(async move {
if let Err(err) =
rescan_projects_in_queue(&pool_ref, &http_ref).await
{
warn!("Delphi rescan failed: {err:#}");
}
});
}
let limiter = web::Data::new(AsyncRateLimiter::new(
redis_pool.clone(),
GCRAParameters::new(300, 300),
));
if enable_background_tasks {
// The interval in seconds at which the local database is indexed
// for searching. Defaults to 1 hour if unset.
let local_index_interval =
Duration::from_secs(ENV.LOCAL_INDEX_INTERVAL);
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
let search_backend_ref = search_backend.clone();
scheduler.run(local_index_interval, move || {
let pool_ref = pool_ref.clone();
let redis_pool_ref = redis_pool_ref.clone();
let search_backend = search_backend_ref.clone();
async move {
if let Err(err) = background_task::index_search(
pool_ref,
redis_pool_ref,
search_backend,
)
.await
{
warn!("Failed to index search: {err:?}");
}
}
});
// Changes statuses of scheduled projects/versions
let pool_ref = pool.clone();
// TODO: Clear cache when these are run
scheduler.run(Duration::from_secs(60 * 5), move || {
let pool_ref = pool_ref.clone();
async move {
if let Err(e) =
background_task::release_scheduled(pool_ref).await
{
warn!("Syncing scheduled releases failed: {e:#}");
}
}
});
let version_index_interval =
Duration::from_secs(ENV.VERSION_INDEX_INTERVAL);
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
scheduler.run(version_index_interval, move || {
let pool_ref = pool_ref.clone();
let redis = redis_pool_ref.clone();
async move {
if let Err(e) = update_versions(pool_ref, redis).await {
warn!("Version update failed: {e:#}");
}
}
});
let pool_ref = pool.clone();
let client_ref = clickhouse.clone();
let redis_pool_ref = redis_pool.clone();
scheduler.run(Duration::from_secs(60 * 60 * 6), move || {
let pool_ref = pool_ref.clone();
let client_ref = client_ref.clone();
let redis_ref = redis_pool_ref.clone();
async move {
if let Err(e) =
background_task::payouts(pool_ref, client_ref, redis_ref)
.await
{
warn!("Payout task failed: {e:#}");
}
}
});
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let stripe_client_ref = stripe_client.clone();
let anrok_client_ref = anrok_client.clone();
actix_rt::spawn(async move {
loop {
index_billing(
stripe_client_ref.clone(),
anrok_client_ref.clone(),
pool_ref.clone(),
redis_ref.clone(),
)
.await;
tokio::time::sleep(Duration::from_secs(60 * 5)).await;
}
});
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let stripe_client_ref = stripe_client.clone();
let anrok_client_ref = anrok_client.clone();
actix_rt::spawn(async move {
loop {
index_subscriptions(
pool_ref.clone(),
redis_ref.clone(),
stripe_client_ref.clone(),
anrok_client_ref.clone(),
)
.await;
tokio::time::sleep(Duration::from_secs(60 * 5)).await;
}
});
}
let session_queue = web::Data::new(AuthQueue::new());
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let session_queue_ref = session_queue.clone();
scheduler.run(Duration::from_secs(60 * 30), move || {
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
let session_queue_ref = session_queue_ref.clone();
async move {
info!("Indexing sessions queue");
let result = session_queue_ref.index(&pool_ref, &redis_ref).await;
if let Err(e) = result {
warn!("Indexing sessions queue failed: {:?}", e);
}
info!("Done indexing sessions queue");
}
});
let analytics_queue = Arc::new(AnalyticsQueue::new());
{
let client_ref = clickhouse.clone();
let analytics_queue_ref = analytics_queue.clone();
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
scheduler.run(Duration::from_secs(15), move || {
let client_ref = client_ref.clone();
let analytics_queue_ref = analytics_queue_ref.clone();
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
async move {
debug!("Indexing analytics queue");
let result = analytics_queue_ref
.index(client_ref, &redis_ref, &pool_ref)
.await;
if let Err(e) = result {
warn!("Indexing analytics queue failed: {:?}", e);
}
debug!("Done indexing analytics queue");
}
});
}
let ip_salt = Pepper {
pepper: ariadne::ids::Base62Id(ariadne::ids::random_base62(11))
.to_string(),
};
let active_sockets = web::Data::new(ActiveSockets::default());
{
let pool = pool.clone();
let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap();
let sockets = active_sockets.clone();
actix_rt::spawn(async move {
let pubsub = redis_client.get_async_pubsub().await.unwrap();
handle_pubsub(pubsub, pool, sockets).await;
});
}
LabrinthConfig {
pool,
ro_pool,
redis_pool,
clickhouse: clickhouse.clone(),
file_host,
scheduler: Arc::new(scheduler),
ip_salt,
search_backend,
session_queue,
payouts_queue: web::Data::new(PayoutsQueue::new()),
analytics_queue,
active_sockets,
automated_moderation_queue,
rate_limiter: limiter,
stripe_client,
anrok_client,
gotenberg_client,
http_client,
archon_client: web::Data::new(
ArchonClient::from_env()
.expect("ARCHON_URL and PYRO_API_KEY must be set"),
),
email_queue: web::Data::new(email_queue),
}
}
pub fn app_config(
cfg: &mut web::ServiceConfig,
labrinth_config: LabrinthConfig,
) {
cfg.app_data(web::FormConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::PathConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::QueryConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::JsonConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::Data::new(labrinth_config.redis_pool.clone()))
.app_data(web::Data::new(labrinth_config.pool.clone()))
.app_data(web::Data::new(labrinth_config.ro_pool.clone()))
.app_data(web::Data::new(labrinth_config.file_host.clone()))
.app_data(labrinth_config.search_backend.clone())
.app_data(web::Data::new(labrinth_config.gotenberg_client.clone()))
.app_data(labrinth_config.http_client.clone())
.app_data(labrinth_config.session_queue.clone())
.app_data(labrinth_config.payouts_queue.clone())
.app_data(labrinth_config.email_queue.clone())
.app_data(web::Data::new(labrinth_config.ip_salt.clone()))
.app_data(web::Data::new(labrinth_config.analytics_queue.clone()))
.app_data(web::Data::new(labrinth_config.clickhouse.clone()))
.app_data(labrinth_config.active_sockets.clone())
.app_data(labrinth_config.automated_moderation_queue.clone())
.app_data(labrinth_config.archon_client.clone())
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
.app_data(web::Data::new(labrinth_config.anrok_client.clone()))
.app_data(labrinth_config.rate_limiter.clone())
.configure(routes::v2::config)
.configure(routes::v3::config)
.configure(routes::internal::config)
.configure(routes::root_config)
.default_service(web::get().wrap(default_cors()).to(routes::not_found));
}
pub fn utoipa_app_config(
cfg: &mut utoipa_actix_web::service_config::ServiceConfig,
_labrinth_config: LabrinthConfig,
) {
cfg.configure({
#[cfg(target_os = "linux")]
{
|cfg| routes::debug::config(cfg)
}
#[cfg(not(target_os = "linux"))]
{
|_cfg| ()
}
})
.configure(routes::v3::utoipa_config)
.configure(routes::internal::utoipa_config);
}