Remove non-Typesense search backends, add default env vars (#6082)

* Remove non-Typesense search backends, add default env vars

* shear

* remove some default keys
This commit is contained in:
aecsocket
2026-05-13 18:15:37 +01:00
committed by GitHub
parent 83dddfd512
commit e5bbd9d409
9 changed files with 98 additions and 2728 deletions

24
Cargo.lock generated
View File

@@ -3059,29 +3059,6 @@ dependencies = [
"serde",
]
[[package]]
name = "elasticsearch"
version = "9.1.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12bb303aa6e1d28c0c86b6fbfe484fd0fd3f512629aeed1ac4f6b85f81d9834a"
dependencies = [
"base64 0.22.1",
"bytes",
"dyn-clone",
"flate2",
"lazy_static",
"parking_lot",
"percent-encoding",
"reqwest 0.12.24",
"rustc_version",
"serde",
"serde_json",
"serde_with",
"tokio",
"url",
"void",
]
[[package]]
name = "elliptic-curve"
version = "0.13.8"
@@ -5275,7 +5252,6 @@ dependencies = [
"dotenv-build",
"dotenvy",
"either",
"elasticsearch",
"eyre",
"futures",
"futures-util",

View File

@@ -77,7 +77,6 @@ dotenv-build = "0.1.1"
dotenvy = "0.15.7"
dunce = "1.0.5"
either = "1.15.0"
elasticsearch = "9.1.0-alpha.1"
encoding_rs = "0.8.35"
enumset = "1.1.10"
eyre = "0.6.12"

View File

@@ -45,7 +45,6 @@ deadpool-redis.workspace = true
derive_more = { workspace = true, features = ["deref", "deref_mut"] }
dotenvy = { workspace = true }
either = { workspace = true }
elasticsearch = { workspace = true, features = ["experimental-apis"] }
eyre = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }

View File

@@ -120,42 +120,50 @@ impl FromStr for StringCsv {
}
vars! {
SENTRY_ENVIRONMENT: String;
SENTRY_TRACES_SAMPLE_RATE: f32;
SITE_URL: String;
CDN_URL: String;
LABRINTH_ADMIN_KEY: String;
LABRINTH_MEDAL_KEY: String;
LABRINTH_EXTERNAL_NOTIFICATION_KEY: String;
RATE_LIMIT_IGNORE_KEY: String;
DATABASE_URL: String;
REDIS_URL: String;
BIND_ADDR: String;
SELF_ADDR: String;
SENTRY_ENVIRONMENT: String = "development";
SENTRY_TRACES_SAMPLE_RATE: f32 = 0.1f32;
SITE_URL: String = "http://localhost:3000";
CDN_URL: String = "file:///tmp/modrinth";
LABRINTH_ADMIN_KEY: String = "";
LABRINTH_MEDAL_KEY: String = "";
LABRINTH_EXTERNAL_NOTIFICATION_KEY: String = "";
RATE_LIMIT_IGNORE_KEY: String = "";
DATABASE_URL: String = "postgresql://labrinth:labrinth@localhost/labrinth";
REDIS_URL: String = "redis://localhost";
BIND_ADDR: String = "";
SELF_ADDR: String = "";
LOCAL_INDEX_INTERVAL: u64;
VERSION_INDEX_INTERVAL: u64;
LOCAL_INDEX_INTERVAL: u64 = 3600u64;
VERSION_INDEX_INTERVAL: u64 = 1800u64;
WHITELISTED_MODPACK_DOMAINS: Json<Vec<String>>;
ALLOWED_CALLBACK_URLS: Json<Vec<String>>;
ANALYTICS_ALLOWED_ORIGINS: Json<Vec<String>>;
WHITELISTED_MODPACK_DOMAINS: Json<Vec<String>> = Json(vec![
"cdn.modrinth.com".into(),
"github.com".into(),
"raw.githubusercontent.com".into(),
]);
ALLOWED_CALLBACK_URLS: Json<Vec<String>> = Json(vec![
"localhost".into(),
".modrinth.com".into(),
"127.0.0.1".into(),
"[::1]".into(),
]);
ANALYTICS_ALLOWED_ORIGINS: Json<Vec<String>> = Json(vec![
"http://127.0.0.1:3000".into(),
"http://localhost:3000".into(),
"https://modrinth.com".into(),
"https://www.modrinth.com".into(),
"*".into(),
]);
// search
SEARCH_BACKEND: crate::search::SearchBackendKind = crate::search::SearchBackendKind::Typesense;
MEILISEARCH_READ_ADDR: String;
MEILISEARCH_WRITE_ADDRS: StringCsv;
MEILISEARCH_KEY: String;
ELASTICSEARCH_URL: String;
ELASTICSEARCH_INDEX_PREFIX: String;
ELASTICSEARCH_USERNAME: String = "";
ELASTICSEARCH_PASSWORD: String = "";
SEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64;
TYPESENSE_URL: String = "http://localhost:8108";
TYPESENSE_API_KEY: String = "modrinth";
TYPESENSE_INDEX_PREFIX: String = "labrinth";
// storage
STORAGE_BACKEND: crate::file_hosting::FileHostKind;
STORAGE_BACKEND: crate::file_hosting::FileHostKind = crate::file_hosting::FileHostKind::Local;
// s3
S3_PUBLIC_BUCKET_NAME: String = "";
@@ -173,98 +181,98 @@ vars! {
S3_PRIVATE_SECRET: String = "";
// local
MOCK_FILE_PATH: String = "";
MOCK_FILE_PATH: String = "/tmp/modrinth";
GITHUB_CLIENT_ID: String;
GITHUB_CLIENT_SECRET: String;
GITLAB_CLIENT_ID: String;
GITLAB_CLIENT_SECRET: String;
DISCORD_CLIENT_ID: String;
DISCORD_CLIENT_SECRET: String;
MICROSOFT_CLIENT_ID: String;
MICROSOFT_CLIENT_SECRET: String;
GOOGLE_CLIENT_ID: String;
GOOGLE_CLIENT_SECRET: String;
STEAM_API_KEY: String;
GITHUB_CLIENT_ID: String = "none";
GITHUB_CLIENT_SECRET: String = "none";
GITLAB_CLIENT_ID: String = "none";
GITLAB_CLIENT_SECRET: String = "none";
DISCORD_CLIENT_ID: String = "none";
DISCORD_CLIENT_SECRET: String = "none";
MICROSOFT_CLIENT_ID: String = "none";
MICROSOFT_CLIENT_SECRET: String = "none";
GOOGLE_CLIENT_ID: String = "none";
GOOGLE_CLIENT_SECRET: String = "none";
STEAM_API_KEY: String = "none";
TREMENDOUS_API_URL: String;
TREMENDOUS_API_KEY: String;
TREMENDOUS_PRIVATE_KEY: String;
TREMENDOUS_API_URL: String = "https://testflight.tremendous.com/api/v2/";
TREMENDOUS_API_KEY: String = "none";
TREMENDOUS_PRIVATE_KEY: String = "none";
PAYPAL_API_URL: String;
PAYPAL_WEBHOOK_ID: String;
PAYPAL_CLIENT_ID: String;
PAYPAL_CLIENT_SECRET: String;
PAYPAL_NVP_USERNAME: String;
PAYPAL_NVP_PASSWORD: String;
PAYPAL_NVP_SIGNATURE: String;
PAYPAL_API_URL: String = "https://api-m.sandbox.paypal.com/v1/";
PAYPAL_WEBHOOK_ID: String = "none";
PAYPAL_CLIENT_ID: String = "none";
PAYPAL_CLIENT_SECRET: String = "none";
PAYPAL_NVP_USERNAME: String = "none";
PAYPAL_NVP_PASSWORD: String = "none";
PAYPAL_NVP_SIGNATURE: String = "none";
PAYPAL_BALANCE_ALERT_THRESHOLD: u64 = 0u64;
BREX_BALANCE_ALERT_THRESHOLD: u64 = 0u64;
TREMENDOUS_BALANCE_ALERT_THRESHOLD: u64 = 0u64;
MURAL_BALANCE_ALERT_THRESHOLD: u64 = 0u64;
HCAPTCHA_SECRET: String;
HCAPTCHA_SECRET: String = "none";
SMTP_USERNAME: String;
SMTP_PASSWORD: String;
SMTP_HOST: String;
SMTP_PORT: u16;
SMTP_TLS: String;
SMTP_FROM_NAME: String;
SMTP_FROM_ADDRESS: String;
SMTP_USERNAME: String = "";
SMTP_PASSWORD: String = "";
SMTP_HOST: String = "localhost";
SMTP_PORT: u16 = 1025u16;
SMTP_TLS: String = "none";
SMTP_FROM_NAME: String = "Modrinth";
SMTP_FROM_ADDRESS: String = "no-reply@mail.modrinth.com";
SITE_VERIFY_EMAIL_PATH: String;
SITE_RESET_PASSWORD_PATH: String;
SITE_BILLING_PATH: String;
SITE_VERIFY_EMAIL_PATH: String = "auth/verify-email";
SITE_RESET_PASSWORD_PATH: String = "auth/reset-password";
SITE_BILLING_PATH: String = "none";
SENDY_URL: String;
SENDY_LIST_ID: String;
SENDY_API_KEY: String;
SENDY_URL: String = "none";
SENDY_LIST_ID: String = "none";
SENDY_API_KEY: String = "none";
CLICKHOUSE_REPLICATED: bool;
CLICKHOUSE_URL: String;
CLICKHOUSE_USER: String;
CLICKHOUSE_PASSWORD: String;
CLICKHOUSE_DATABASE: String;
CLICKHOUSE_REPLICATED: bool = false;
CLICKHOUSE_URL: String = "http://localhost:8123";
CLICKHOUSE_USER: String = "default";
CLICKHOUSE_PASSWORD: String = "default";
CLICKHOUSE_DATABASE: String = "staging_ariadne";
FLAME_ANVIL_URL: String;
FLAME_ANVIL_URL: String = "none";
GOTENBERG_URL: String;
GOTENBERG_CALLBACK_BASE: String;
GOTENBERG_TIMEOUT: u64;
GOTENBERG_URL: String = "http://localhost:13000";
GOTENBERG_CALLBACK_BASE: String = "http://host.docker.internal:8000/_internal/gotenberg";
GOTENBERG_TIMEOUT: u64 = 30000u64;
STRIPE_API_KEY: String;
STRIPE_WEBHOOK_SECRET: String;
STRIPE_API_KEY: String = "none";
STRIPE_WEBHOOK_SECRET: String = "none";
ADITUDE_API_KEY: String;
ADITUDE_API_KEY: String = "none";
PYRO_API_KEY: String;
PYRO_API_KEY: String = "none";
BREX_API_URL: String;
BREX_API_KEY: String;
BREX_API_URL: String = "https://platform.brexapis.com/v2/";
BREX_API_KEY: String = "none";
DELPHI_URL: String;
DELPHI_URL: String = "";
AVALARA_1099_API_URL: String;
AVALARA_1099_API_KEY: String;
AVALARA_1099_API_TEAM_ID: String;
AVALARA_1099_COMPANY_ID: String;
AVALARA_1099_API_URL: String = "https://www.track1099.com/api";
AVALARA_1099_API_KEY: String = "none";
AVALARA_1099_API_TEAM_ID: String = "none";
AVALARA_1099_COMPANY_ID: String = "207337084";
ANROK_API_URL: String;
ANROK_API_KEY: String;
ANROK_API_URL: String = "";
ANROK_API_KEY: String = "";
PAYOUT_ALERT_SLACK_WEBHOOK: String;
PAYOUT_ALERT_SLACK_WEBHOOK: String = "none";
CLOUDFLARE_INTEGRATION: bool = false;
ARCHON_URL: String;
ARCHON_URL: String = "";
MURALPAY_API_URL: String;
MURALPAY_API_KEY: String;
MURALPAY_TRANSFER_API_KEY: String;
MURALPAY_API_URL: String = "https://api-staging.muralpay.com";
MURALPAY_API_KEY: String = "none";
MURALPAY_TRANSFER_API_KEY: String = "none";
MURALPAY_SOURCE_ACCOUNT_ID: muralpay::AccountId = muralpay::AccountId(uuid::Uuid::nil());
DEFAULT_AFFILIATE_REVENUE_SPLIT: Decimal;
DEFAULT_AFFILIATE_REVENUE_SPLIT: Decimal = Decimal::new(1, 1);
DATABASE_ACQUIRE_TIMEOUT_MS: u64 = 30000u64;
DATABASE_MIN_CONNECTIONS: u32 = 0u32;
@@ -286,7 +294,7 @@ vars! {
MODERATION_SLACK_WEBHOOK: String = "";
DELPHI_SLACK_WEBHOOK: String = "";
TREMENDOUS_CAMPAIGN_ID: String = "";
TREMENDOUS_CAMPAIGN_ID: String = "none";
// server pinging
SERVER_PING_MAX_CONCURRENT: usize = 16usize;

File diff suppressed because it is too large Load Diff

View File

@@ -1,701 +0,0 @@
use std::sync::LazyLock;
use std::time::Duration;
use crate::database::PgPool;
use crate::database::redis::RedisPool;
use crate::env::ENV;
use crate::search::backend::meilisearch::MeilisearchConfig;
use crate::search::indexing::index_local;
use crate::search::{SearchField, UploadSearchProject};
use crate::util::error::Context;
use ariadne::ids::base62_impl::to_base62;
use eyre::{Result, eyre};
use futures::StreamExt;
use futures::stream::FuturesOrdered;
use meilisearch_sdk::client::{Client, SwapIndexes};
use meilisearch_sdk::indexes::Index;
use meilisearch_sdk::settings::{PaginationSetting, Settings};
use meilisearch_sdk::task_info::TaskInfo;
use tracing::{Instrument, error, info, info_span, instrument};
// // The chunk size for adding projects to the indexing database. If the request size
// // is too large (>10MiB) then the request fails with an error. This chunk size
// // assumes a max average size of 4KiB per project to avoid this cap.
//
// Set this to 50k for better observability
const MEILISEARCH_CHUNK_SIZE: usize = 50000; // 10_000_000
fn search_operation_timeout() -> std::time::Duration {
std::time::Duration::from_millis(ENV.SEARCH_OPERATION_TIMEOUT)
}
pub async fn remove_documents(
ids: &[crate::models::ids::VersionId],
config: &MeilisearchConfig,
) -> Result<()> {
let mut indexes = get_indexes_for_indexing(config, false, false)
.await
.wrap_err("failed to get current indexes")?;
let indexes_next = get_indexes_for_indexing(config, true, false)
.await
.wrap_err("failed to get next indexes")?;
for list in &mut indexes {
for alt_list in &indexes_next {
list.extend(alt_list.iter().cloned());
}
}
let client = config
.make_batch_client()
.wrap_err("failed to create batch client")?;
let client = &client;
let ids_base62 = ids.iter().map(|x| to_base62(x.0)).collect::<Vec<_>>();
let mut deletion_tasks = FuturesOrdered::new();
client.across_all(indexes, |index_list, client| {
for index in index_list {
let owned_client = client.clone();
let ids_base62_ref = &ids_base62;
deletion_tasks.push_back(async move {
index
.delete_documents(ids_base62_ref)
.await
.wrap_err_with(|| {
eyre!("failed to request to delete documents {ids_base62_ref:?}")
})?
.wait_for_completion(
&owned_client,
None,
Some(Duration::from_secs(15)),
)
.await
.wrap_err_with(|| {
eyre!("failed to delete documents {ids_base62_ref:?}")
})
});
}
});
while let Some(result) = deletion_tasks.next().await {
result?;
}
Ok(())
}
pub async fn index_projects(
ro_pool: PgPool,
redis: RedisPool,
config: &MeilisearchConfig,
) -> Result<()> {
info!("Indexing projects.");
info!("Ensuring current indexes exists");
// First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing)
get_indexes_for_indexing(config, false, false)
.await
.wrap_err("failed to get indexes for indexing")?;
info!("Deleting surplus indexes");
// Then, delete the next index if it still exists
let indices = get_indexes_for_indexing(config, true, false)
.await
.wrap_err("failed to get next indexes to delete")?;
for client_indices in indices {
for index in client_indices {
index.delete().await.wrap_err("failed to delete an index")?;
}
}
info!("Recreating next index");
// Recreate the next index for indexing
let indices = get_indexes_for_indexing(config, true, true)
.await
.wrap_internal_err("failed to recreate next index")?;
let all_loader_fields =
crate::database::models::loader_fields::LoaderField::get_fields_all(
&ro_pool, &redis,
)
.await
.wrap_internal_err("failed to get all loader fields")?
.into_iter()
.map(|x| x.field)
.collect::<Vec<_>>();
info!("Gathering local projects");
let mut cursor = 0;
let mut idx = 0;
let mut total = 0;
loop {
info!("Gathering index data chunk {idx}");
idx += 1;
let (uploads, next_cursor) =
index_local(&ro_pool, &redis, cursor, 10000).await?;
total += uploads.len();
if uploads.is_empty() {
info!(
"No more projects to index, indexed {total} projects after {idx} chunks"
);
break;
}
cursor = next_cursor;
add_projects_batch_client(
&indices,
uploads,
all_loader_fields.clone(),
config,
)
.await?;
}
info!("Swapping indexes");
// Swap the index
swap_index(config, "projects").await?;
swap_index(config, "projects_filtered").await?;
info!("Deleting old indexes");
// Delete the now-old index
for index_list in indices {
for index in index_list {
index.delete().await?;
}
}
info!("Done adding projects.");
Ok(())
}
pub async fn swap_index(
config: &MeilisearchConfig,
index_name: &str,
) -> Result<()> {
let client = config.make_batch_client()?;
let index_name_next = config.get_index_name(index_name, true);
let index_name = config.get_index_name(index_name, false);
let swap_indices = SwapIndexes {
indexes: (index_name_next, index_name),
rename: None,
};
let swap_indices_ref = &swap_indices;
// is it "indexes" or "indices"? who knows! roll a die!
client
.with_all_clients("swap_indexes", |client| async move {
let task = client
.swap_indexes([swap_indices_ref])
.await
.wrap_err("failed to swap indices")?;
monitor_task(
client,
task,
Duration::from_secs(60 * 10), // 10 minutes
Some(Duration::from_secs(1)),
)
.await?;
Ok(())
})
.await?;
Ok(())
}
#[instrument(skip(config))]
pub async fn get_indexes_for_indexing(
config: &MeilisearchConfig,
next: bool, // Get the 'next' one
update_settings: bool,
) -> Result<Vec<Vec<Index>>> {
let client = config.make_batch_client()?;
let project_name = config.get_index_name("projects", next);
let project_filtered_name =
config.get_index_name("projects_filtered", next);
let project_name_ref = &project_name;
let project_filtered_name_ref = &project_filtered_name;
let results = client
.with_all_clients("get_indexes_for_indexing", |client| async move {
let projects_index = create_or_update_index(
client,
project_name_ref,
Some(&[
"words",
"typo",
"proximity",
"attribute",
"exactness",
"sort",
]),
update_settings,
)
.await?;
let projects_filtered_index = create_or_update_index(
client,
project_filtered_name_ref,
Some(&[
"sort",
"words",
"typo",
"proximity",
"attribute",
"exactness",
]),
update_settings,
)
.await?;
Ok(vec![projects_index, projects_filtered_index])
})
.await?;
Ok(results)
}
#[instrument(skip_all, fields(name))]
async fn create_or_update_index(
client: &Client,
name: &str,
custom_rules: Option<&'static [&'static str]>,
update_settings: bool,
) -> Result<Index, meilisearch_sdk::errors::Error> {
info!("Updating/creating index");
match client.get_index(name).await {
Ok(index) => {
info!("Updating index settings.");
let mut settings = default_settings();
if let Some(custom_rules) = custom_rules {
settings = settings.with_ranking_rules(custom_rules);
}
if update_settings {
info!("Updating index settings");
index
.set_settings(&settings)
.await
.inspect_err(|e| {
error!("Error setting index settings: {e:?}")
})?
.wait_for_completion(
client,
None,
Some(search_operation_timeout()),
)
.await
.inspect_err(|e| {
error!(
"Error setting index settings while waiting: {e:?}"
)
})?;
}
info!("Done performing index settings set.");
Ok(index)
}
_ => {
info!("Creating index.");
// Only create index and set settings if the index doesn't already exist
let task = client.create_index(name, Some("version_id")).await?;
let task = task
.wait_for_completion(
client,
None,
Some(search_operation_timeout()),
)
.await
.inspect_err(|e| {
error!("Error creating index while waiting: {e:?}")
})?;
let index = task
.try_make_index(client)
.map_err(|x| x.unwrap_failure())?;
let mut settings = default_settings();
if let Some(custom_rules) = custom_rules {
settings = settings.with_ranking_rules(custom_rules);
}
if update_settings {
index
.set_settings(&settings)
.await
.inspect_err(|e| {
error!("Error setting index settings: {e:?}")
})?
.wait_for_completion(
client,
None,
Some(search_operation_timeout()),
)
.await
.inspect_err(|e| {
error!(
"Error setting index settings while waiting: {e:?}"
)
})?;
}
Ok(index)
}
}
}
#[instrument(skip_all, fields(%index.uid, mods.len = mods.len()))]
async fn add_to_index(
client: &Client,
index: &Index,
mods: &[UploadSearchProject],
) -> Result<()> {
for chunk in mods.chunks(MEILISEARCH_CHUNK_SIZE) {
info!(
"Adding chunk of {} versions starting with version id {}",
chunk.len(),
chunk[0].version_id
);
let now = std::time::Instant::now();
let task = index
.add_or_replace(chunk, Some("version_id"))
.await
.inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?;
monitor_task(
client,
task,
Duration::from_secs(60 * 5), // Timeout after 10 minutes
Some(Duration::from_secs(1)), // Poll once every second
)
.await?;
info!(
"Added chunk of {} projects to index in {:.2} seconds",
chunk.len(),
now.elapsed().as_secs_f64()
);
}
Ok(())
}
async fn monitor_task(
client: &Client,
task: TaskInfo,
timeout: Duration,
poll: Option<Duration>,
) -> Result<()> {
let now = std::time::Instant::now();
let id = task.get_task_uid();
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.reset();
let wait = task.wait_for_completion(client, poll, Some(timeout));
tokio::select! {
biased;
result = wait => {
info!("Task {id} completed in {:.2} seconds: {result:?}", now.elapsed().as_secs_f64());
result?;
}
_ = interval.tick() => {
struct Id(u32);
impl AsRef<u32> for Id {
fn as_ref(&self) -> &u32 {
&self.0
}
}
// it takes an AsRef<u32> but u32 itself doesn't impl it lol
if let Ok(task) = client.get_task(Id(id)).await {
if task.is_pending() {
info!("Task {id} is still pending after {:.2} seconds", now.elapsed().as_secs_f64());
}
} else {
error!("Error getting task {id}");
}
}
};
Ok(())
}
#[instrument(skip_all, fields(index.uid = %index.uid))]
async fn update_and_add_to_index(
client: &Client,
index: &Index,
projects: &[UploadSearchProject],
_additional_fields: &[String],
) -> Result<()> {
// TODO: Uncomment this- hardcoding loader_fields is a band-aid fix, and will be fixed soon
// let mut new_filterable_attributes: Vec<String> = index.get_filterable_attributes().await?;
// let mut new_displayed_attributes = index.get_displayed_attributes().await?;
// // Check if any 'additional_fields' are not already in the index
// // Only add if they are not already in the index
// let new_fields = additional_fields
// .iter()
// .filter(|x| !new_filterable_attributes.contains(x))
// .collect::<Vec<_>>();
// if !new_fields.is_empty() {
// info!("Adding new fields to index: {:?}", new_fields);
// new_filterable_attributes.extend(new_fields.iter().map(|s: &&String| s.to_string()));
// new_displayed_attributes.extend(new_fields.iter().map(|s| s.to_string()));
// // Adds new fields to the index
// let filterable_task = index
// .set_filterable_attributes(new_filterable_attributes)
// .await?;
// let displayable_task = index
// .set_displayed_attributes(new_displayed_attributes)
// .await?;
// // Allow a long timeout for adding new attributes- it only needs to happen the once
// filterable_task
// .wait_for_completion(client, None, Some(search_operation_timeout() * 100))
// .await?;
// displayable_task
// .wait_for_completion(client, None, Some(search_operation_timeout() * 100))
// .await?;
// }
info!("Adding to index.");
add_to_index(client, index, projects).await?;
Ok(())
}
pub async fn add_projects_batch_client(
indices: &[Vec<Index>],
projects: Vec<UploadSearchProject>,
additional_fields: Vec<String>,
config: &MeilisearchConfig,
) -> Result<()> {
let client = config.make_batch_client()?;
let index_references = indices
.iter()
.map(|x| x.iter().collect())
.collect::<Vec<Vec<&Index>>>();
let mut tasks = FuturesOrdered::new();
let mut id = 0;
client.across_all(index_references, |index_list, client| {
let span = info_span!("add_projects_batch", client.idx = id);
id += 1;
for index in index_list {
let owned_client = client.clone();
let projects_ref = &projects;
let additional_fields_ref = &additional_fields;
tasks.push_back(
async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
}
.instrument(span.clone()),
);
}
});
while let Some(result) = tasks.next().await {
result?;
}
Ok(())
}
fn default_settings() -> Settings {
Settings::new()
.with_distinct_attribute(Some("project_id"))
.with_displayed_attributes(DEFAULT_DISPLAYED_ATTRIBUTES)
.with_searchable_attributes(DEFAULT_SEARCHABLE_ATTRIBUTES)
.with_sortable_attributes(DEFAULT_SORTABLE_ATTRIBUTES)
.with_filterable_attributes(&*MEILI_FILTERABLE_ATTRIBUTES)
.with_pagination(PaginationSetting {
max_total_hits: 2147483647,
})
}
pub struct MeilisearchFieldSpec {
pub path: &'static str,
pub filterable: bool,
}
impl SearchField {
pub const fn meilisearch_spec(self) -> MeilisearchFieldSpec {
match self {
SearchField::Categories => MeilisearchFieldSpec {
path: "categories",
filterable: true,
},
SearchField::Name => MeilisearchFieldSpec {
path: "name",
filterable: true,
},
SearchField::Author => MeilisearchFieldSpec {
path: "author",
filterable: true,
},
SearchField::License => MeilisearchFieldSpec {
path: "license",
filterable: true,
},
SearchField::ProjectTypes => MeilisearchFieldSpec {
path: "project_types",
filterable: true,
},
SearchField::ProjectId => MeilisearchFieldSpec {
path: "project_id",
filterable: true,
},
SearchField::OpenSource => MeilisearchFieldSpec {
path: "open_source",
filterable: true,
},
SearchField::Environment => MeilisearchFieldSpec {
path: "environment",
filterable: true,
},
SearchField::GameVersions => MeilisearchFieldSpec {
path: "game_versions",
filterable: true,
},
SearchField::ClientSide => MeilisearchFieldSpec {
path: "client_side",
filterable: true,
},
SearchField::ServerSide => MeilisearchFieldSpec {
path: "server_side",
filterable: true,
},
SearchField::MinecraftServerRegion => MeilisearchFieldSpec {
path: "minecraft_server.region",
filterable: true,
},
SearchField::MinecraftServerLanguages => MeilisearchFieldSpec {
path: "minecraft_server.languages",
filterable: true,
},
SearchField::MinecraftJavaServerContentKind => {
MeilisearchFieldSpec {
path: "minecraft_java_server.content.kind",
filterable: true,
}
}
SearchField::MinecraftJavaServerContentSupportedGameVersions => {
MeilisearchFieldSpec {
path: "minecraft_java_server.content.supported_game_versions",
filterable: true,
}
}
SearchField::MinecraftJavaServerPingData => MeilisearchFieldSpec {
path: "minecraft_java_server.ping.data",
filterable: true,
},
}
}
}
static MEILI_FILTERABLE_ATTRIBUTES: LazyLock<Vec<&'static str>> =
LazyLock::new(|| {
use strum::IntoEnumIterator;
SearchField::iter()
.filter_map(|field| {
let spec = field.meilisearch_spec();
spec.filterable.then_some(spec.path)
})
.collect()
});
const DEFAULT_DISPLAYED_ATTRIBUTES: &[&str] = &[
"project_id",
"version_id",
"project_types",
"slug",
"author",
"name",
"summary",
"categories",
"display_categories",
"downloads",
"follows",
"icon_url",
"date_created",
"date_modified",
"latest_version",
"license",
"gallery",
"featured_gallery",
"color",
// Note: loader fields are not here, but are added on as they are needed (so they can be dynamically added depending on which exist).
// TODO: remove these- as they should be automatically populated. This is a band-aid fix.
"environment",
"game_versions",
"mrpack_loaders",
// V2 legacy fields for logical consistency
"client_side",
"server_side",
// Non-searchable fields for filling out the Project model.
"license_url",
"monetization_status",
"team_id",
"thread_id",
"versions",
"date_published",
"date_queued",
"status",
"requested_status",
"games",
"organization_id",
"links",
"gallery_items",
"loaders", // search uses loaders as categories- this is purely for the Project model.
"project_loader_fields",
"minecraft_mod",
"minecraft_server",
"minecraft_java_server",
"minecraft_bedrock_server",
];
const DEFAULT_SEARCHABLE_ATTRIBUTES: &[&str] =
&["name", "summary", "author", "slug"];
const DEFAULT_SORTABLE_ATTRIBUTES: &[&str] = &[
"downloads",
"follows",
"date_created",
"date_modified",
"version_published_timestamp",
"minecraft_java_server.verified_plays_2w",
"minecraft_java_server.ping.data.players_online",
];

View File

@@ -1,489 +0,0 @@
use crate::database::PgPool;
use crate::database::redis::RedisPool;
use crate::env::ENV;
use crate::models::ids::VersionId;
use crate::routes::ApiError;
use crate::search::backend::{
SearchIndex, SearchIndexName, combined_search_filters, parse_search_index,
parse_search_request,
};
use crate::search::{
ResultSearchProject, SearchBackend, SearchRequest, SearchResults,
TasksCancelFilter,
};
use crate::util::error::Context;
use async_trait::async_trait;
use eyre::Result;
use futures::TryStreamExt;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
use meilisearch_sdk::client::Client;
use meilisearch_sdk::tasks::{Task, TasksCancelQuery};
use serde::Serialize;
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Write;
use std::time::Duration;
use tracing::{Instrument, info_span};
pub mod indexing;
#[derive(Debug, Clone)]
pub struct MeilisearchReadClient {
pub client: Client,
}
impl std::ops::Deref for MeilisearchReadClient {
type Target = Client;
fn deref(&self) -> &Self::Target {
&self.client
}
}
pub struct BatchClient {
pub clients: Vec<Client>,
}
impl BatchClient {
pub fn new(clients: Vec<Client>) -> Self {
Self { clients }
}
pub async fn with_all_clients<'a, T, G, Fut>(
&'a self,
task_name: &str,
generator: G,
) -> Result<Vec<T>>
where
G: Fn(&'a Client) -> Fut,
Fut: Future<Output = Result<T>> + 'a,
{
let mut tasks = FuturesOrdered::new();
for (idx, client) in self.clients.iter().enumerate() {
tasks.push_back(generator(client).instrument(info_span!(
"client_task",
task.name = task_name,
client.idx = idx,
)));
}
let results = tasks.try_collect::<Vec<T>>().await?;
Ok(results)
}
pub fn across_all<T, F, R>(&self, data: Vec<T>, mut predicate: F) -> Vec<R>
where
F: FnMut(T, &Client) -> R,
{
assert_eq!(
data.len(),
self.clients.len(),
"mismatch between data len and meilisearch client count"
);
self.clients
.iter()
.zip(data)
.map(|(client, item)| predicate(item, client))
.collect()
}
}
#[derive(Debug, Clone)]
pub struct MeilisearchConfig {
pub addresses: Vec<String>,
pub read_lb_address: String,
pub key: String,
pub meta_namespace: String,
}
impl MeilisearchConfig {
pub fn new(meta_namespace: Option<String>) -> Self {
Self {
addresses: ENV.MEILISEARCH_WRITE_ADDRS.0.clone(),
key: ENV.MEILISEARCH_KEY.clone(),
meta_namespace: meta_namespace.unwrap_or_default(),
read_lb_address: ENV.MEILISEARCH_READ_ADDR.clone(),
}
}
pub fn make_loadbalanced_read_client(
&self,
) -> Result<MeilisearchReadClient, meilisearch_sdk::errors::Error> {
Ok(MeilisearchReadClient {
client: Client::new(&self.read_lb_address, Some(&self.key))?,
})
}
pub fn make_batch_client(
&self,
) -> Result<BatchClient, meilisearch_sdk::errors::Error> {
Ok(BatchClient::new(
self.addresses
.iter()
.map(|address| {
Client::new(address.as_str(), Some(self.key.as_str()))
})
.collect::<Result<Vec<_>, _>>()?,
))
}
pub fn get_index_name(&self, index: &str, next: bool) -> String {
let alt = if next { "_alt" } else { "" };
format!("{}_{}_{}", self.meta_namespace, index, alt)
}
}
pub struct Meilisearch {
pub config: MeilisearchConfig,
}
impl Meilisearch {
pub fn new(config: MeilisearchConfig) -> Self {
Self { config }
}
fn get_sort_index(
&self,
index: &str,
new_filters: Option<&str>,
) -> Result<(String, &'static [&'static str]), ApiError> {
let sort = parse_search_index(index, new_filters)?;
let index_name = match sort.index_name {
SearchIndexName::Projects => {
self.config.get_index_name("projects", false)
}
SearchIndexName::ProjectsFiltered => {
self.config.get_index_name("projects_filtered", false)
}
};
Ok(match sort.index {
SearchIndex::Relevance => (
index_name,
&["downloads:desc", "version_published_timestamp:desc"],
),
SearchIndex::Downloads => (
index_name,
&["downloads:desc", "version_published_timestamp:desc"],
),
SearchIndex::Follows => (
index_name,
&["follows:desc", "version_published_timestamp:desc"],
),
SearchIndex::Updated => (
index_name,
&["date_modified:desc", "version_published_timestamp:desc"],
),
SearchIndex::Newest => (
index_name,
&["date_created:desc", "version_published_timestamp:desc"],
),
SearchIndex::MinecraftJavaServerVerifiedPlays2w => (
index_name,
&[
"minecraft_java_server.verified_plays_2w:desc",
"minecraft_java_server.ping.data.players_online:desc",
"version_published_timestamp:desc",
],
),
SearchIndex::MinecraftJavaServerPlayersOnline => (
index_name,
&[
"minecraft_java_server.ping.data.players_online:desc",
"version_published_timestamp:desc",
],
),
})
}
}
#[async_trait]
impl SearchBackend for Meilisearch {
async fn search_for_project_raw(
&self,
info: &SearchRequest,
) -> Result<SearchResults, ApiError> {
let parsed = parse_search_request(info)?;
let (index_name, sort_name) =
self.get_sort_index(parsed.index, info.new_filters.as_deref())?;
let client = self
.config
.make_loadbalanced_read_client()
.wrap_internal_err("failed to make load-balanced read client")?;
let meilisearch_index = client
.get_index(index_name)
.await
.wrap_internal_err("failed to get index")?;
let mut filter_string = String::new();
let results = {
let mut query = meilisearch_index.search();
query
.with_page(parsed.page)
.with_hits_per_page(parsed.hits_per_page)
.with_query(parsed.query)
.with_sort(sort_name);
if let Some(new_filters) = info.new_filters.as_deref() {
query.with_filter(new_filters);
} else {
let facets = if let Some(facets) = &info.facets {
let facets =
serde_json::from_str::<Vec<Vec<Value>>>(facets)
.wrap_request_err("failed to parse facets")?;
Some(facets)
} else {
None
};
let filters =
combined_search_filters(info).unwrap_or_else(|| "".into());
if let Some(facets) = facets {
let facets: Vec<Vec<Vec<String>>> =
facets
.into_iter()
.map(|facets| {
facets
.into_iter()
.map(|facet| {
if facet.is_array() {
serde_json::from_value::<Vec<String>>(facet)
.unwrap_or_default()
} else {
vec![
serde_json::from_value::<String>(facet)
.unwrap_or_default(),
]
}
})
.collect_vec()
})
.collect_vec();
filter_string.push('(');
for (index, facet_outer_list) in facets.iter().enumerate() {
filter_string.push('(');
for (facet_outer_index, facet_inner_list) in
facet_outer_list.iter().enumerate()
{
filter_string.push('(');
for (facet_inner_index, facet) in
facet_inner_list.iter().enumerate()
{
filter_string
.push_str(&facet.replace(':', " = "));
if facet_inner_index
!= (facet_inner_list.len() - 1)
{
filter_string.push_str(" AND ")
}
}
filter_string.push(')');
if facet_outer_index != (facet_outer_list.len() - 1)
{
filter_string.push_str(" OR ")
}
}
filter_string.push(')');
if index != (facets.len() - 1) {
filter_string.push_str(" AND ")
}
}
filter_string.push(')');
if !filters.is_empty() {
write!(filter_string, " AND ({filters})")
.expect("write should not fail");
}
} else {
filter_string.push_str(&filters);
}
if !filter_string.is_empty() {
query.with_filter(&filter_string);
}
}
if info.show_metadata {
query.with_show_ranking_score(true);
query.with_show_ranking_score_details(true);
query.execute().await?
} else {
query.execute::<ResultSearchProject>().await?
}
};
if info.show_metadata {
let hits = results
.hits
.into_iter()
.map(|hit| {
let metadata = serde_json::to_value(&hit)
.ok()
.and_then(|value| value.as_object().cloned())
.map(|mut value| {
value.remove("_formatted");
value.remove("_matchesPosition");
value.remove("_federation");
let result = value.remove("result");
let metadata = Value::Object(value);
(result, metadata)
});
let (result, metadata) =
metadata.unwrap_or((None, Value::Null));
let mut result = result
.and_then(|value| {
serde_json::from_value::<ResultSearchProject>(value)
.ok()
})
.unwrap_or(hit.result);
if !metadata.is_null() {
result.search_metadata = Some(metadata);
}
result
})
.collect();
Ok(SearchResults {
hits,
page: results.page.unwrap_or_default(),
hits_per_page: results.hits_per_page.unwrap_or_default(),
total_hits: results.total_hits.unwrap_or_default(),
})
} else {
Ok(SearchResults {
hits: results.hits.into_iter().map(|r| r.result).collect(),
page: results.page.unwrap_or_default(),
hits_per_page: results.hits_per_page.unwrap_or_default(),
total_hits: results.total_hits.unwrap_or_default(),
})
}
}
async fn index_projects(
&self,
ro_pool: PgPool,
redis: RedisPool,
) -> eyre::Result<()> {
indexing::index_projects(ro_pool, redis, &self.config).await?;
Ok(())
}
async fn remove_documents(&self, ids: &[VersionId]) -> eyre::Result<()> {
indexing::remove_documents(ids, &self.config).await?;
Ok(())
}
async fn tasks(&self) -> eyre::Result<Value> {
let client = self
.config
.make_batch_client()
.wrap_internal_err("failed to make batch client")?;
let tasks = client
.with_all_clients("get_tasks", async |client| {
let tasks = client.get_tasks().await?;
Ok(tasks.results)
})
.await
.wrap_internal_err("failed to get tasks")?;
#[derive(Serialize)]
struct MeiliTask<Time> {
uid: u32,
status: &'static str,
duration: Option<Duration>,
enqueued_at: Option<Time>,
}
#[derive(Serialize)]
struct TaskList<Time> {
by_instance: HashMap<String, Vec<MeiliTask<Time>>>,
}
let response = tasks
.into_iter()
.enumerate()
.map(|(idx, instance_tasks)| {
let tasks = instance_tasks
.into_iter()
.filter_map(|task| {
Some(match task {
Task::Enqueued { content } => MeiliTask {
uid: content.uid,
status: "enqueued",
duration: None,
enqueued_at: Some(content.enqueued_at),
},
Task::Processing { content } => MeiliTask {
uid: content.uid,
status: "processing",
duration: None,
enqueued_at: Some(content.enqueued_at),
},
Task::Failed { content } => MeiliTask {
uid: content.task.uid,
status: "failed",
duration: Some(content.task.duration),
enqueued_at: Some(content.task.enqueued_at),
},
Task::Succeeded { .. } => return None,
})
})
.collect();
(idx.to_string(), tasks)
})
.collect::<HashMap<String, Vec<MeiliTask<_>>>>();
let response = serde_json::to_value(TaskList {
by_instance: response,
})
.wrap_internal_err("failed to serialize tasks response")?;
Ok(response)
}
async fn tasks_cancel(
&self,
filter: &TasksCancelFilter,
) -> eyre::Result<()> {
let client = self
.config
.make_batch_client()
.wrap_internal_err("failed to make batch client")?;
let all_results = client
.with_all_clients("cancel_tasks", async |client| {
let mut q = TasksCancelQuery::new(client);
match filter {
TasksCancelFilter::All => {}
TasksCancelFilter::Indexes { indexes } => {
q.with_index_uids(indexes.iter().map(|s| s.as_str()));
}
TasksCancelFilter::AllEnqueued => {
q.with_statuses(["enqueued"]);
}
};
let result = client.cancel_tasks_with(&q).await;
Ok(result)
})
.await
.wrap_internal_err("failed to cancel tasks")?;
for r in all_results {
r.wrap_internal_err("failed to cancel tasks")?;
}
Ok(())
}
}

View File

@@ -1,12 +1,8 @@
mod common;
pub mod elasticsearch;
pub mod meilisearch;
pub mod typesense;
pub use common::{
ParsedSearchRequest, SearchIndex, SearchIndexName, SearchSort,
combined_search_filters, parse_search_index, parse_search_request,
};
pub use elasticsearch::Elasticsearch;
pub use meilisearch::{Meilisearch, MeilisearchConfig};
pub use typesense::{Typesense, TypesenseConfig};

View File

@@ -52,8 +52,6 @@ pub struct SearchRequest {
#[serde(default)]
pub show_metadata: bool,
#[serde(default)]
pub elasticsearch_config: backend::elasticsearch::RequestConfig,
#[serde(default)]
pub typesense_config: backend::typesense::RequestConfig,
pub new_filters: Option<String>,
@@ -71,8 +69,6 @@ impl From<SearchQuery> for SearchRequest {
index: query.index,
limit: query.limit,
show_metadata: false,
elasticsearch_config:
backend::elasticsearch::RequestConfig::default(),
typesense_config: backend::typesense::RequestConfig::default(),
new_filters: query.new_filters,
facets: query.facets,
@@ -178,8 +174,6 @@ pub enum TasksCancelFilter {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SearchBackendKind {
Meilisearch,
Elasticsearch,
Typesense,
}
@@ -212,8 +206,6 @@ impl FromStr for SearchBackendKind {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"meilisearch" => SearchBackendKind::Meilisearch,
"elasticsearch" => SearchBackendKind::Elasticsearch,
"typesense" => SearchBackendKind::Typesense,
_ => return Err(InvalidSearchBackendKind),
})
@@ -351,13 +343,6 @@ impl From<UploadSearchProject> for ResultSearchProject {
pub fn backend(meta_namespace: Option<String>) -> Box<dyn SearchBackend> {
match ENV.SEARCH_BACKEND {
SearchBackendKind::Meilisearch => {
let config = backend::MeilisearchConfig::new(meta_namespace);
Box::new(backend::Meilisearch::new(config))
}
SearchBackendKind::Elasticsearch => {
Box::new(backend::Elasticsearch::new(meta_namespace).unwrap())
}
SearchBackendKind::Typesense => {
let config = backend::TypesenseConfig::new(meta_namespace);
Box::new(backend::Typesense::new(config))