Increase/make configurable search timeouts (#5261)
* Increase default operation timeout, make it configurable via SEARCH_OPERATION_TIMEOUT * Don't update index settings when unneeded * Add to env local
This commit is contained in:
committed by
GitHub
parent
e3395a7366
commit
345ada27c0
@@ -17,6 +17,8 @@ DATABASE_MAX_CONNECTIONS=16
|
|||||||
|
|
||||||
MEILISEARCH_READ_ADDR=http://localhost:7700
|
MEILISEARCH_READ_ADDR=http://localhost:7700
|
||||||
MEILISEARCH_WRITE_ADDRS=http://localhost:7700
|
MEILISEARCH_WRITE_ADDRS=http://localhost:7700
|
||||||
|
# 5 minutes in milliseconds
|
||||||
|
SEARCH_OPERATION_TIMEOUT=300000
|
||||||
|
|
||||||
# # For a sharded Meilisearch setup (sharded-meilisearch docker compose profile)
|
# # For a sharded Meilisearch setup (sharded-meilisearch docker compose profile)
|
||||||
# MEILISEARCH_READ_ADDR=http://localhost:7710
|
# MEILISEARCH_READ_ADDR=http://localhost:7710
|
||||||
|
|||||||
@@ -39,14 +39,22 @@ pub enum IndexingError {
|
|||||||
//
|
//
|
||||||
// Set this to 50k for better observability
|
// Set this to 50k for better observability
|
||||||
const MEILISEARCH_CHUNK_SIZE: usize = 50000; // 10_000_000
|
const MEILISEARCH_CHUNK_SIZE: usize = 50000; // 10_000_000
|
||||||
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
|
|
||||||
|
fn search_operation_timeout() -> std::time::Duration {
|
||||||
|
let default_ms = 5 * 60 * 1000; // 5 minutes
|
||||||
|
let ms = dotenvy::var("SEARCH_OPERATION_TIMEOUT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse::<u64>().ok())
|
||||||
|
.unwrap_or(default_ms);
|
||||||
|
std::time::Duration::from_millis(ms)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn remove_documents(
|
pub async fn remove_documents(
|
||||||
ids: &[crate::models::ids::VersionId],
|
ids: &[crate::models::ids::VersionId],
|
||||||
config: &SearchConfig,
|
config: &SearchConfig,
|
||||||
) -> Result<(), IndexingError> {
|
) -> Result<(), IndexingError> {
|
||||||
let mut indexes = get_indexes_for_indexing(config, false).await?;
|
let mut indexes = get_indexes_for_indexing(config, false, false).await?;
|
||||||
let indexes_next = get_indexes_for_indexing(config, true).await?;
|
let indexes_next = get_indexes_for_indexing(config, true, false).await?;
|
||||||
|
|
||||||
for list in &mut indexes {
|
for list in &mut indexes {
|
||||||
for alt_list in &indexes_next {
|
for alt_list in &indexes_next {
|
||||||
@@ -94,11 +102,11 @@ pub async fn index_projects(
|
|||||||
|
|
||||||
info!("Ensuring current indexes exists");
|
info!("Ensuring current indexes exists");
|
||||||
// First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing)
|
// First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing)
|
||||||
get_indexes_for_indexing(config, false).await?;
|
get_indexes_for_indexing(config, false, false).await?;
|
||||||
|
|
||||||
info!("Deleting surplus indexes");
|
info!("Deleting surplus indexes");
|
||||||
// Then, delete the next index if it still exists
|
// Then, delete the next index if it still exists
|
||||||
let indices = get_indexes_for_indexing(config, true).await?;
|
let indices = get_indexes_for_indexing(config, true, false).await?;
|
||||||
for client_indices in indices {
|
for client_indices in indices {
|
||||||
for index in client_indices {
|
for index in client_indices {
|
||||||
index.delete().await?;
|
index.delete().await?;
|
||||||
@@ -107,7 +115,7 @@ pub async fn index_projects(
|
|||||||
|
|
||||||
info!("Recreating next index");
|
info!("Recreating next index");
|
||||||
// Recreate the next index for indexing
|
// Recreate the next index for indexing
|
||||||
let indices = get_indexes_for_indexing(config, true).await?;
|
let indices = get_indexes_for_indexing(config, true, true).await?;
|
||||||
|
|
||||||
let all_loader_fields =
|
let all_loader_fields =
|
||||||
crate::database::models::loader_fields::LoaderField::get_fields_all(
|
crate::database::models::loader_fields::LoaderField::get_fields_all(
|
||||||
@@ -208,6 +216,7 @@ pub async fn swap_index(
|
|||||||
pub async fn get_indexes_for_indexing(
|
pub async fn get_indexes_for_indexing(
|
||||||
config: &SearchConfig,
|
config: &SearchConfig,
|
||||||
next: bool, // Get the 'next' one
|
next: bool, // Get the 'next' one
|
||||||
|
update_settings: bool,
|
||||||
) -> Result<Vec<Vec<Index>>, IndexingError> {
|
) -> Result<Vec<Vec<Index>>, IndexingError> {
|
||||||
let client = config.make_batch_client()?;
|
let client = config.make_batch_client()?;
|
||||||
let project_name = config.get_index_name("projects", next);
|
let project_name = config.get_index_name("projects", next);
|
||||||
@@ -230,6 +239,7 @@ pub async fn get_indexes_for_indexing(
|
|||||||
"exactness",
|
"exactness",
|
||||||
"sort",
|
"sort",
|
||||||
]),
|
]),
|
||||||
|
update_settings,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let projects_filtered_index = create_or_update_index(
|
let projects_filtered_index = create_or_update_index(
|
||||||
@@ -243,6 +253,7 @@ pub async fn get_indexes_for_indexing(
|
|||||||
"attribute",
|
"attribute",
|
||||||
"exactness",
|
"exactness",
|
||||||
]),
|
]),
|
||||||
|
update_settings,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -258,6 +269,7 @@ async fn create_or_update_index(
|
|||||||
client: &Client,
|
client: &Client,
|
||||||
name: &str,
|
name: &str,
|
||||||
custom_rules: Option<&'static [&'static str]>,
|
custom_rules: Option<&'static [&'static str]>,
|
||||||
|
update_settings: bool,
|
||||||
) -> Result<Index, meilisearch_sdk::errors::Error> {
|
) -> Result<Index, meilisearch_sdk::errors::Error> {
|
||||||
info!("Updating/creating index");
|
info!("Updating/creating index");
|
||||||
|
|
||||||
@@ -271,16 +283,26 @@ async fn create_or_update_index(
|
|||||||
settings = settings.with_ranking_rules(custom_rules);
|
settings = settings.with_ranking_rules(custom_rules);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Performing index settings set.");
|
if update_settings {
|
||||||
index
|
info!("Updating index settings");
|
||||||
.set_settings(&settings)
|
index
|
||||||
.await
|
.set_settings(&settings)
|
||||||
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
|
.await
|
||||||
.wait_for_completion(client, None, Some(TIMEOUT))
|
.inspect_err(|e| {
|
||||||
.await
|
error!("Error setting index settings: {e:?}")
|
||||||
.inspect_err(|e| {
|
})?
|
||||||
error!("Error setting index settings while waiting: {e:?}")
|
.wait_for_completion(
|
||||||
})?;
|
client,
|
||||||
|
None,
|
||||||
|
Some(search_operation_timeout()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| {
|
||||||
|
error!(
|
||||||
|
"Error setting index settings while waiting: {e:?}"
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
info!("Done performing index settings set.");
|
info!("Done performing index settings set.");
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
@@ -291,7 +313,11 @@ async fn create_or_update_index(
|
|||||||
// Only create index and set settings if the index doesn't already exist
|
// Only create index and set settings if the index doesn't already exist
|
||||||
let task = client.create_index(name, Some("version_id")).await?;
|
let task = client.create_index(name, Some("version_id")).await?;
|
||||||
let task = task
|
let task = task
|
||||||
.wait_for_completion(client, None, Some(TIMEOUT))
|
.wait_for_completion(
|
||||||
|
client,
|
||||||
|
None,
|
||||||
|
Some(search_operation_timeout()),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.inspect_err(|e| {
|
.inspect_err(|e| {
|
||||||
error!("Error creating index while waiting: {e:?}")
|
error!("Error creating index while waiting: {e:?}")
|
||||||
@@ -306,15 +332,25 @@ async fn create_or_update_index(
|
|||||||
settings = settings.with_ranking_rules(custom_rules);
|
settings = settings.with_ranking_rules(custom_rules);
|
||||||
}
|
}
|
||||||
|
|
||||||
index
|
if update_settings {
|
||||||
.set_settings(&settings)
|
index
|
||||||
.await
|
.set_settings(&settings)
|
||||||
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
|
.await
|
||||||
.wait_for_completion(client, None, Some(TIMEOUT))
|
.inspect_err(|e| {
|
||||||
.await
|
error!("Error setting index settings: {e:?}")
|
||||||
.inspect_err(|e| {
|
})?
|
||||||
error!("Error setting index settings while waiting: {e:?}")
|
.wait_for_completion(
|
||||||
})?;
|
client,
|
||||||
|
None,
|
||||||
|
Some(search_operation_timeout()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| {
|
||||||
|
error!(
|
||||||
|
"Error setting index settings while waiting: {e:?}"
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
@@ -436,10 +472,10 @@ async fn update_and_add_to_index(
|
|||||||
|
|
||||||
// // Allow a long timeout for adding new attributes- it only needs to happen the once
|
// // Allow a long timeout for adding new attributes- it only needs to happen the once
|
||||||
// filterable_task
|
// filterable_task
|
||||||
// .wait_for_completion(client, None, Some(TIMEOUT * 100))
|
// .wait_for_completion(client, None, Some(search_operation_timeout() * 100))
|
||||||
// .await?;
|
// .await?;
|
||||||
// displayable_task
|
// displayable_task
|
||||||
// .wait_for_completion(client, None, Some(TIMEOUT * 100))
|
// .wait_for_completion(client, None, Some(search_operation_timeout() * 100))
|
||||||
// .await?;
|
// .await?;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user