diff --git a/apps/labrinth/src/clickhouse/mod.rs b/apps/labrinth/src/clickhouse/mod.rs index a892653ed..dc19d3c2d 100644 --- a/apps/labrinth/src/clickhouse/mod.rs +++ b/apps/labrinth/src/clickhouse/mod.rs @@ -218,6 +218,16 @@ pub async fn init_client_with_database( .execute() .await?; + client + .query(&format!( + " + ALTER TABLE {database}.{MINECRAFT_SERVER_PLAYS} {cluster_line} + ADD COLUMN IF NOT EXISTS ip IPv6 DEFAULT toIPv6('::') + " + )) + .execute() + .await?; + client .query(&format!( " diff --git a/apps/labrinth/src/database/models/project_item.rs b/apps/labrinth/src/database/models/project_item.rs index b6522208d..0c50729e6 100644 --- a/apps/labrinth/src/database/models/project_item.rs +++ b/apps/labrinth/src/database/models/project_item.rs @@ -187,6 +187,7 @@ impl ProjectBuilder { pub async fn insert( self, transaction: &mut PgTransaction<'_>, + http: &reqwest::Client, ) -> Result { let project_struct = DBProject { id: self.project_id, @@ -234,7 +235,7 @@ impl ProjectBuilder { for mut version in self.initial_versions { version.project_id = self.project_id; - version.insert(&mut *transaction).await?; + version.insert(&mut *transaction, http).await?; } LinkUrl::insert_many_projects( diff --git a/apps/labrinth/src/database/models/version_item.rs b/apps/labrinth/src/database/models/version_item.rs index c5a2fa42f..6ffaf90c7 100644 --- a/apps/labrinth/src/database/models/version_item.rs +++ b/apps/labrinth/src/database/models/version_item.rs @@ -135,6 +135,7 @@ impl VersionFileBuilder { self, version_id: DBVersionId, transaction: &mut PgTransaction<'_>, + http: &reqwest::Client, ) -> Result { let file_id = generate_file_id(&mut *transaction).await?; @@ -173,6 +174,7 @@ impl VersionFileBuilder { DelphiRunParameters { file_id: file_id.into(), }, + http, ) .await { @@ -193,6 +195,7 @@ impl VersionBuilder { pub async fn insert( self, transaction: &mut PgTransaction<'_>, + http: &reqwest::Client, ) -> Result { let version = DBVersion { id: self.version_id, @@ -233,7 +236,7 @@ impl VersionBuilder { } = self; for file in files { - file.insert(version_id, transaction).await?; + file.insert(version_id, transaction, http).await?; } DependencyBuilder::insert_many( diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 46ac81290..f1b4a4033 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -22,6 +22,7 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::routes::internal::delphi::rescan::rescan_projects_in_queue; use crate::util::anrok; use crate::util::archon::ArchonClient; +use crate::util::http::HttpClient; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use sync::friends::handle_pubsub; @@ -69,6 +70,7 @@ pub struct LabrinthConfig { pub email_queue: web::Data, pub archon_client: web::Data, pub gotenberg_client: GotenbergClient, + pub http_client: web::Data, } #[allow(clippy::too_many_arguments)] @@ -103,10 +105,14 @@ pub fn app_setup( let scheduler = scheduler::Scheduler::new(); + let http_client = web::Data::new(HttpClient::new()); { let pool_ref = pool.clone(); + let http_ref = http_client.clone(); actix_rt::spawn(async move { - if let Err(err) = rescan_projects_in_queue(&pool_ref).await { + if let Err(err) = + rescan_projects_in_queue(&pool_ref, &http_ref).await + { warn!("Delphi rescan failed: {err:#}"); } }); @@ -303,6 +309,7 @@ pub fn app_setup( stripe_client, anrok_client, gotenberg_client, + http_client, archon_client: web::Data::new( ArchonClient::from_env() .expect("ARCHON_URL and PYRO_API_KEY must be set"), @@ -333,6 +340,7 @@ pub fn app_config( .app_data(web::Data::new(labrinth_config.file_host.clone())) .app_data(web::Data::new(labrinth_config.search_config.clone())) .app_data(web::Data::new(labrinth_config.gotenberg_client.clone())) + .app_data(labrinth_config.http_client.clone()) .app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.payouts_queue.clone()) .app_data(labrinth_config.email_queue.clone()) diff --git a/apps/labrinth/src/models/v3/analytics.rs b/apps/labrinth/src/models/v3/analytics.rs index 4392a27ae..ea3c22e37 100644 --- a/apps/labrinth/src/models/v3/analytics.rs +++ b/apps/labrinth/src/models/v3/analytics.rs @@ -86,4 +86,5 @@ pub struct MinecraftServerPlay { pub project_id: u64, #[serde(with = "clickhouse::serde::uuid")] pub minecraft_uuid: Uuid, + pub ip: Ipv6Addr, } diff --git a/apps/labrinth/src/queue/analytics/mod.rs b/apps/labrinth/src/queue/analytics/mod.rs index c57f3695e..1f04d81a8 100644 --- a/apps/labrinth/src/queue/analytics/mod.rs +++ b/apps/labrinth/src/queue/analytics/mod.rs @@ -14,12 +14,15 @@ pub mod cache; const DOWNLOADS_NAMESPACE: &str = "downloads"; const VIEWS_NAMESPACE: &str = "views"; +const MINECRAFT_SERVER_PLAYS_NAMESPACE: &str = "minecraft_server_plays"; +const MINECRAFT_SERVER_PLAYS_EXPIRY: u64 = 86_400; // 24 hours +const MINECRAFT_SERVER_PLAYS_LIMIT: u32 = 5; pub struct AnalyticsQueue { views_queue: DashMap<(u64, u64), Vec>, downloads_queue: DashMap<(u64, u64), Download>, playtime_queue: DashSet, - minecraft_server_plays_queue: DashSet, + minecraft_server_plays_queue: DashMap<(u128, u64), MinecraftServerPlay>, affiliate_code_clicks_queue: DashMap<(u64, u64), Vec>, } @@ -36,7 +39,7 @@ impl AnalyticsQueue { views_queue: DashMap::with_capacity(1000), downloads_queue: DashMap::with_capacity(1000), playtime_queue: DashSet::with_capacity(1000), - minecraft_server_plays_queue: DashSet::with_capacity(1000), + minecraft_server_plays_queue: DashMap::with_capacity(1000), affiliate_code_clicks_queue: DashMap::with_capacity(1000), } } @@ -60,7 +63,8 @@ impl AnalyticsQueue { } pub fn add_minecraft_server_play(&self, play: MinecraftServerPlay) { - self.minecraft_server_plays_queue.insert(play); + self.minecraft_server_plays_queue + .insert((play.minecraft_uuid.as_u128(), play.project_id), play); } pub fn add_affiliate_code_click(&self, click: AffiliateCodeClick) { @@ -118,11 +122,67 @@ impl AnalyticsQueue { } if !minecraft_server_plays_queue.is_empty() { + let mut plays_keys = Vec::new(); + let raw_plays = DashMap::new(); + + for (index, (key, play)) in + minecraft_server_plays_queue.into_iter().enumerate() + { + plays_keys.push(key); + raw_plays.insert(index, play); + } + + let mut redis = + redis.pool.get().await.map_err(DatabaseError::RedisPool)?; + + let results = cmd("MGET") + .arg( + plays_keys + .iter() + .map(|x| { + format!( + "{}:{}-{}", + MINECRAFT_SERVER_PLAYS_NAMESPACE, x.0, x.1 + ) + }) + .collect::>(), + ) + .query_async::>>(&mut redis) + .await + .map_err(DatabaseError::CacheError)?; + + let mut pipe = redis::pipe(); + for (idx, count) in results.into_iter().enumerate() { + let key = &plays_keys[idx]; + + let new_count = if let Some(count) = count { + if count >= MINECRAFT_SERVER_PLAYS_LIMIT { + raw_plays.remove(&idx); + continue; + } + count + 1 + } else { + 1 + }; + + pipe.atomic().set_ex( + format!( + "{}:{}-{}", + MINECRAFT_SERVER_PLAYS_NAMESPACE, key.0, key.1 + ), + new_count, + MINECRAFT_SERVER_PLAYS_EXPIRY, + ); + } + pipe.query_async::<()>(&mut *redis) + .await + .map_err(DatabaseError::CacheError)?; + let mut plays = client .insert::(MINECRAFT_SERVER_PLAYS) .await?; - for play in minecraft_server_plays_queue { + for (_, play) in raw_plays { plays.write(&play).await?; } diff --git a/apps/labrinth/src/routes/analytics.rs b/apps/labrinth/src/routes/analytics.rs index bef6c8a6e..15dab8295 100644 --- a/apps/labrinth/src/routes/analytics.rs +++ b/apps/labrinth/src/routes/analytics.rs @@ -1,5 +1,6 @@ use crate::auth::get_user_from_headers; use crate::database::PgPool; +use crate::database::models::DBProject; use crate::database::redis::RedisPool; use crate::env::ENV; use crate::models::analytics::{MinecraftServerPlay, PageView, Playtime}; @@ -9,8 +10,11 @@ use crate::queue::analytics::AnalyticsQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::util::date::get_current_tenths_of_ms; +use crate::util::error::Context; +use crate::util::http::HttpClient; use actix_web::{HttpRequest, HttpResponse}; use actix_web::{post, web}; +use eyre::eyre; use serde::Deserialize; use std::collections::HashMap; use std::net::Ipv4Addr; @@ -233,10 +237,18 @@ async fn playtime_ingest( Ok(HttpResponse::NoContent().finish()) } +#[derive(Debug, Deserialize)] +struct MinecraftProfile { + id: Uuid, + name: String, +} + #[derive(Deserialize)] pub struct MinecraftJavaServerPlayInput { project_id: ProjectId, - minecraft_uuid: Uuid, + username: Option, + server_id: Option, + minecraft_uuid: Option, } pub const MINECRAFT_SERVER_PLAYS: &str = "minecraft_server_plays"; @@ -249,7 +261,8 @@ async fn minecraft_server_play_ingest( play_input: web::Json, pool: web::Data, redis: web::Data, -) -> Result { + http: web::Data, +) -> Result<(), ApiError> { let user = get_user_from_headers( &req, &**pool, @@ -262,14 +275,86 @@ async fn minecraft_server_play_ingest( .ok(); let project_id = play_input.project_id; + + let project = DBProject::get(&project_id.to_string(), &**pool, &redis) + .await? + .ok_or(ApiError::NotFound)?; + + if project.components.minecraft_server.is_none() { + return Err(ApiError::Request(eyre!( + "not a `minecraft_server` project" + ))); + } + + let minecraft_uuid = if let (Some(username), Some(server_id)) = + (&play_input.username, &play_input.server_id) + { + let has_joined = http + .get("https://sessionserver.mojang.com/session/minecraft/hasJoined") + .query(&[ + ("username", username.as_str()), + ("serverId", server_id.as_str()), + ]) + .send() + .await + .wrap_internal_err("failed to contact Mojang session server")?; + + if has_joined.status() == reqwest::StatusCode::NO_CONTENT + || !has_joined.status().is_success() + { + return Err(ApiError::Request(eyre!( + "Minecraft session verification failed" + ))); + } + + let profile = has_joined + .json::() + .await + .wrap_internal_err("invalid Mojang session response")?; + + if profile.name != *username { + return Err(ApiError::Request(eyre!( + "returned Mojang profile name does not match username" + ))); + } + + profile.id + } else { + play_input + .minecraft_uuid + .wrap_request_err("missing `minecraft_uuid`")? + }; + + let conn_info = req.connection_info().peer_addr().map(|x| x.to_string()); + let headers = req + .headers() + .into_iter() + .map(|(key, val)| { + ( + key.to_string().to_lowercase(), + val.to_str().unwrap_or_default().to_string(), + ) + }) + .collect::>(); + + let ip = crate::util::ip::convert_to_ip_v6( + if let Some(header) = headers.get("cf-connecting-ip") { + header + } else { + conn_info.as_deref().unwrap_or_default() + }, + ) + .unwrap_or_else(|_| Ipv4Addr::new(127, 0, 0, 1).to_ipv6_mapped()); + let row = MinecraftServerPlay { recorded: get_current_tenths_of_ms(), user_id: user.map(|u| u.id.0).unwrap_or(0), project_id: project_id.0, - minecraft_uuid: play_input.minecraft_uuid, + minecraft_uuid, + ip, }; analytics_queue.add_minecraft_server_play(row); - Ok(HttpResponse::NoContent().finish()) + Ok(()) } diff --git a/apps/labrinth/src/routes/internal/delphi/mod.rs b/apps/labrinth/src/routes/internal/delphi/mod.rs index dc5772bc7..2a18a203b 100644 --- a/apps/labrinth/src/routes/internal/delphi/mod.rs +++ b/apps/labrinth/src/routes/internal/delphi/mod.rs @@ -1,11 +1,10 @@ -use std::{collections::HashMap, fmt::Write, sync::LazyLock, time::Instant}; +use std::{collections::HashMap, fmt::Write, time::Instant}; -use crate::database::PgPool; use crate::env::ENV; +use crate::{database::PgPool, util::http::HttpClient}; use actix_web::{HttpRequest, HttpResponse, get, post, web}; use chrono::{DateTime, Utc}; use eyre::eyre; -use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; use serde::Deserialize; use tokio::sync::Mutex; use tracing::info; @@ -46,21 +45,6 @@ pub fn config(cfg: &mut web::ServiceConfig) { ); } -static DELPHI_CLIENT: LazyLock = LazyLock::new(|| { - reqwest::Client::builder() - .default_headers({ - HeaderMap::from_iter([( - USER_AGENT, - HeaderValue::from_static(concat!( - "Labrinth/", - env!("COMPILATION_DATE") - )), - )]) - }) - .build() - .unwrap() -}); - /// Type of [`DelphiReportIssueDetails::key`]. /// /// Delphi may provide `null` for the key, but we require a key for storing @@ -367,6 +351,7 @@ async fn ingest_report_deserialized( pub async fn run( exec: impl crate::database::Executor<'_, Database = sqlx::Postgres>, run_parameters: DelphiRunParameters, + http: &reqwest::Client, ) -> Result { let file_data = sqlx::query!( r#" @@ -389,8 +374,7 @@ pub async fn run( run_parameters.file_id.0 ); - DELPHI_CLIENT - .post(&ENV.DELPHI_URL) + http.post(&ENV.DELPHI_URL) .json(&serde_json::json!({ "url": file_data.url, "project_id": ProjectId(file_data.project_id.0 as u64), @@ -489,6 +473,7 @@ async fn _run( redis: web::Data, session_queue: web::Data, run_parameters: web::Query, + http: web::Data, ) -> Result { check_is_moderator_from_headers( &req, @@ -499,7 +484,7 @@ async fn _run( ) .await?; - run(&**pool, run_parameters.into_inner()).await + run(&**pool, run_parameters.into_inner(), &http).await } #[get("version")] @@ -531,6 +516,7 @@ async fn issue_type_schema( pool: web::Data, redis: web::Data, session_queue: web::Data, + http: web::Data, ) -> Result { check_is_moderator_from_headers( &req, @@ -556,8 +542,7 @@ async fn issue_type_schema( cache_entry => Ok(HttpResponse::Ok().json( &cache_entry .insert(( - DELPHI_CLIENT - .get(format!("{}/schema", ENV.DELPHI_URL)) + http.get(format!("{}/schema", ENV.DELPHI_URL)) .send() .await .and_then(|res| res.error_for_status()) diff --git a/apps/labrinth/src/routes/internal/delphi/rescan.rs b/apps/labrinth/src/routes/internal/delphi/rescan.rs index 197369c52..c7d7fa297 100644 --- a/apps/labrinth/src/routes/internal/delphi/rescan.rs +++ b/apps/labrinth/src/routes/internal/delphi/rescan.rs @@ -2,11 +2,14 @@ use eyre::{Result, WrapErr, eyre}; use futures::future::try_join_all; use tracing::info; -use super::{DELPHI_CLIENT, DelphiRunParameters}; +use super::DelphiRunParameters; use crate::{database::PgPool, env::ENV, models::ids::FileId}; -pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> { - let delphi_version = fetch_delphi_version().await?; +pub async fn rescan_projects_in_queue( + pool: &PgPool, + http: &reqwest::Client, +) -> Result<()> { + let delphi_version = fetch_delphi_version(http).await?; let old_delphi_version = fetch_stored_delphi_version(pool).await?; if old_delphi_version == Some(delphi_version) { @@ -44,7 +47,7 @@ pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> { .map(|file_id| FileId(file_id.cast_unsigned())); try_join_all(file_ids.map(|file_id| async move { - super::run(pool, DelphiRunParameters { file_id }) + super::run(pool, DelphiRunParameters { file_id }, http) .await .wrap_err_with(|| { eyre!("failed to submit Delphi rescan for `{file_id:?}`") @@ -60,8 +63,8 @@ pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> { Ok(()) } -async fn fetch_delphi_version() -> Result { - let response = DELPHI_CLIENT +async fn fetch_delphi_version(http: &reqwest::Client) -> Result { + let response = http .get(format!("{}/version", ENV.DELPHI_URL)) .send() .await diff --git a/apps/labrinth/src/routes/v2/project_creation.rs b/apps/labrinth/src/routes/v2/project_creation.rs index 543bb3487..d2dae8be0 100644 --- a/apps/labrinth/src/routes/v2/project_creation.rs +++ b/apps/labrinth/src/routes/v2/project_creation.rs @@ -12,6 +12,7 @@ use crate::queue::session::AuthQueue; use crate::routes::v3::project_creation::default_project_type; use crate::routes::v3::project_creation::{CreateError, NewGalleryItem}; use crate::routes::{v2_reroute, v3}; +use crate::util::http::HttpClient; use actix_multipart::Multipart; use actix_web::web::Data; use actix_web::{HttpRequest, HttpResponse, post}; @@ -141,6 +142,7 @@ pub async fn project_create( redis: Data, file_host: Data>, session_queue: Data, + http: Data, ) -> Result { // Convert V2 multipart payload to V3 multipart payload let payload = v2_reroute::alter_actix_multipart( @@ -252,6 +254,7 @@ pub async fn project_create( redis.clone(), file_host, session_queue, + http, ) .await?; diff --git a/apps/labrinth/src/routes/v2/version_creation.rs b/apps/labrinth/src/routes/v2/version_creation.rs index 4fab4ccc6..3bdde43cc 100644 --- a/apps/labrinth/src/routes/v2/version_creation.rs +++ b/apps/labrinth/src/routes/v2/version_creation.rs @@ -13,6 +13,7 @@ use crate::queue::session::AuthQueue; use crate::routes::v3::project_creation::CreateError; use crate::routes::v3::version_creation; use crate::routes::{v2_reroute, v3}; +use crate::util::http::HttpClient; use actix_multipart::Multipart; use actix_web::http::header::ContentDisposition; use actix_web::web::Data; @@ -83,6 +84,7 @@ pub async fn version_create( file_host: Data>, session_queue: Data, moderation_queue: Data, + http: Data, ) -> Result { let payload = v2_reroute::alter_actix_multipart( payload, @@ -237,6 +239,7 @@ pub async fn version_create( file_host, session_queue, moderation_queue, + http, ) .await?; @@ -286,6 +289,7 @@ pub async fn upload_file_to_version( redis: Data, file_host: Data>, session_queue: web::Data, + http: web::Data, ) -> Result { // Returns NoContent, so no need to convert to V2 let response = v3::version_creation::upload_file_to_version( @@ -296,6 +300,7 @@ pub async fn upload_file_to_version( redis.clone(), file_host, session_queue, + http, ) .await?; Ok(response) diff --git a/apps/labrinth/src/routes/v3/project_creation.rs b/apps/labrinth/src/routes/v3/project_creation.rs index 769a4b5d7..24c600154 100644 --- a/apps/labrinth/src/routes/v3/project_creation.rs +++ b/apps/labrinth/src/routes/v3/project_creation.rs @@ -24,6 +24,7 @@ use crate::models::v3::user_limits::UserLimits; use crate::queue::session::AuthQueue; use crate::search::indexing::IndexingError; use crate::util::guards::admin_key_guard; +use crate::util::http::HttpClient; use crate::util::img::upload_image_optimized; use crate::util::routes::read_from_field; use crate::util::validate::validation_errors_to_string; @@ -300,6 +301,7 @@ pub async fn project_create( redis: Data, file_host: Data>, session_queue: Data, + http: Data, ) -> Result { project_create_internal( req, @@ -308,6 +310,7 @@ pub async fn project_create( redis, file_host, session_queue, + http, ) .await } @@ -319,6 +322,7 @@ pub async fn project_create_internal( redis: Data, file_host: Data>, session_queue: Data, + http: Data, ) -> Result { let mut transaction = client.begin().await?; let mut uploaded_files = Vec::new(); @@ -335,6 +339,7 @@ pub async fn project_create_internal( &client, &redis, &session_queue, + &http, project_id, ) .await; @@ -366,6 +371,7 @@ pub async fn project_create_with_id( redis: Data, file_host: Data>, session_queue: Data, + http: Data, path: web::Path<(ProjectId,)>, ) -> Result { let mut transaction = client.begin().await?; @@ -382,6 +388,7 @@ pub async fn project_create_with_id( &client, &redis, &session_queue, + &http, project_id, ) .await; @@ -443,6 +450,7 @@ async fn project_create_inner( pool: &PgPool, redis: &RedisPool, session_queue: &AuthQueue, + http: &reqwest::Client, project_id: ProjectId, ) -> Result { // The currently logged in user @@ -907,7 +915,9 @@ async fn project_create_inner( let now = Utc::now(); - let id = project_builder_actual.insert(&mut *transaction).await?; + let id = project_builder_actual + .insert(&mut *transaction, http) + .await?; DBUser::clear_project_cache(&[current_user.id.into()], redis).await?; for image_id in project_create_data.uploaded_images { diff --git a/apps/labrinth/src/routes/v3/project_creation/new.rs b/apps/labrinth/src/routes/v3/project_creation/new.rs index c248b7864..cbfd33971 100644 --- a/apps/labrinth/src/routes/v3/project_creation/new.rs +++ b/apps/labrinth/src/routes/v3/project_creation/new.rs @@ -29,7 +29,9 @@ use crate::{ }, queue::session::AuthQueue, routes::ApiError, - util::{error::Context, validate::validation_errors_to_string}, + util::{ + error::Context, http::HttpClient, validate::validation_errors_to_string, + }, }; pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) { @@ -116,6 +118,7 @@ pub async fn create( db: web::Data, redis: web::Data, session_queue: web::Data, + http: web::Data, web::Json(create): web::Json, ) -> Result, CreateError> { // check that the user can make a project @@ -302,13 +305,13 @@ pub async fn create( }; project_builder - .insert(&mut txn) + .insert(&mut txn, &http) .await .wrap_internal_err("failed to insert project")?; if let Some(version_builder) = version_builder { version_builder - .insert(&mut txn) + .insert(&mut txn, &http) .await .wrap_internal_err("failed to insert initial version")?; } diff --git a/apps/labrinth/src/routes/v3/version_creation.rs b/apps/labrinth/src/routes/v3/version_creation.rs index a946ea756..58d007ff6 100644 --- a/apps/labrinth/src/routes/v3/version_creation.rs +++ b/apps/labrinth/src/routes/v3/version_creation.rs @@ -27,6 +27,7 @@ use crate::models::projects::{DependencyType, ProjectStatus, skip_nulls}; use crate::models::teams::ProjectPermissions; use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; +use crate::util::http::HttpClient; use crate::util::routes::read_from_field; use crate::util::validate::validation_errors_to_string; use crate::validate::{ValidationResult, validate_file}; @@ -112,6 +113,7 @@ pub async fn version_create( file_host: Data>, session_queue: Data, moderation_queue: web::Data, + http: web::Data, ) -> Result { let mut transaction = client.begin().await?; let mut uploaded_files = Vec::new(); @@ -126,6 +128,7 @@ pub async fn version_create( &client, &session_queue, &moderation_queue, + &http, ) .await; @@ -159,6 +162,7 @@ async fn version_create_inner( pool: &PgPool, session_queue: &AuthQueue, moderation_queue: &AutomatedModerationQueue, + http: &reqwest::Client, ) -> Result { let mut initial_version_data = None; let mut version_builder = None; @@ -480,7 +484,7 @@ async fn version_create_inner( }; let project_id = builder.project_id; - builder.insert(transaction).await?; + builder.insert(transaction, http).await?; for image_id in version_data.uploaded_images { if let Some(db_image) = @@ -542,6 +546,7 @@ pub async fn upload_file_to_version( redis: Data, file_host: Data>, session_queue: web::Data, + http: web::Data, ) -> Result { let mut transaction = client.begin().await?; let mut uploaded_files = Vec::new(); @@ -558,6 +563,7 @@ pub async fn upload_file_to_version( &mut uploaded_files, version_id, &session_queue, + &http, ) .await; @@ -591,6 +597,7 @@ async fn upload_file_to_version_inner( uploaded_files: &mut Vec, version_id: models::DBVersionId, session_queue: &AuthQueue, + http: &reqwest::Client, ) -> Result { let mut initial_file_data: Option = None; let mut file_builders: Vec = Vec::new(); @@ -774,7 +781,7 @@ async fn upload_file_to_version_inner( )); } else { for file in file_builders { - file.insert(version_id, &mut *transaction).await?; + file.insert(version_id, &mut *transaction, http).await?; } } diff --git a/apps/labrinth/src/util/http.rs b/apps/labrinth/src/util/http.rs new file mode 100644 index 000000000..cfa942ef7 --- /dev/null +++ b/apps/labrinth/src/util/http.rs @@ -0,0 +1,32 @@ +use std::time::Duration; + +use derive_more::Deref; +use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT}; + +/// Generic HTTP client used for anywhere you need to send an HTTP request, and +/// do not care what headers or other parameters are used. +#[derive(Debug, Clone, Deref)] +pub struct HttpClient(pub reqwest::Client); + +impl HttpClient { + pub fn new() -> Self { + let client = reqwest::Client::builder() + .default_headers(HeaderMap::from_iter([( + USER_AGENT, + HeaderValue::from_static(concat!( + "Labrinth/", + env!("COMPILATION_DATE") + )), + )])) + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); + Self(client) + } +} + +impl Default for HttpClient { + fn default() -> Self { + Self::new() + } +} diff --git a/apps/labrinth/src/util/mod.rs b/apps/labrinth/src/util/mod.rs index b712e2678..83fce2240 100644 --- a/apps/labrinth/src/util/mod.rs +++ b/apps/labrinth/src/util/mod.rs @@ -10,6 +10,7 @@ pub mod error; pub mod ext; pub mod gotenberg; pub mod guards; +pub mod http; pub mod img; pub mod ip; pub mod ratelimit; diff --git a/apps/labrinth/tests/analytics.rs b/apps/labrinth/tests/analytics.rs index f20f26471..217f0e7b1 100644 --- a/apps/labrinth/tests/analytics.rs +++ b/apps/labrinth/tests/analytics.rs @@ -1,11 +1,8 @@ -use actix_http::StatusCode; -use actix_web::test; use ariadne::ids::base62_impl::parse_base62; use chrono::{DateTime, Duration, Utc}; use common::permissions::PermissionsTest; use common::permissions::PermissionsTestContext; use common::{ - api_common::Api, api_v3::ApiV3, database::*, environment::{TestEnvironment, with_test_environment}, @@ -248,26 +245,3 @@ pub async fn permissions_analytics_revenue() { ) .await; } - -#[actix_rt::test] -pub async fn analytics_minecraft_server_play_ingest() { - with_test_environment( - None, - |test_env: TestEnvironment| async move { - let api = &test_env.api; - let project_id = test_env.dummy.project_alpha.project_id.clone(); - - let req = test::TestRequest::post() - .uri("/analytics/minecraft-server-play") - .append_header(("Authorization", USER_USER_PAT.unwrap())) - .set_json(serde_json::json!({ - "project_id": project_id, - "minecraft_uuid": "12345678-1234-5678-1234-567812345678" - })) - .to_request(); - let resp = api.call(req).await; - assert_status!(&resp, StatusCode::NO_CONTENT); - }, - ) - .await; -} diff --git a/packages/app-lib/src/api/profile/mod.rs b/packages/app-lib/src/api/profile/mod.rs index 6f122fa96..f4e6d1f3d 100644 --- a/packages/app-lib/src/api/profile/mod.rs +++ b/packages/app-lib/src/api/profile/mod.rs @@ -23,6 +23,7 @@ use serde_json::json; use tracing::{info, warn}; use std::collections::{HashMap, HashSet}; +use std::time::Duration; use crate::data::Settings; use crate::server_address::ServerAddress; @@ -739,25 +740,54 @@ async fn run_credentials( if let Some(linked_data) = &profile.linked_data { let project_id = &linked_data.project_id; if !project_id.trim().is_empty() { - let result = fetch::post_json( - concat!( - env!("MODRINTH_API_BASE_URL"), - "analytics/minecraft-server-play" - ), - json!({ - "project_id": &linked_data.project_id, - "minecraft_uuid": credentials.offline_profile.id, - }), - &state.api_semaphore, - &state.pool, - ) - .await; + let server_id = uuid::Uuid::new_v4().to_string(); - match result { - Ok(()) => { - info!("Tracked server play for '{project_id}' in analytics") + let join_result = fetch::REQWEST_CLIENT + .post("https://sessionserver.mojang.com/session/minecraft/join") + .json(&json!({ + "accessToken": &credentials.access_token, + "selectedProfile": credentials.offline_profile.id.simple().to_string(), + "serverId": &server_id, + })) + .timeout(Duration::from_secs(5)) + .send() + .await; + + match join_result { + Ok(resp) if resp.status().is_success() => { + let result = fetch::post_json( + concat!( + env!("MODRINTH_API_BASE_URL"), + "analytics/minecraft-server-play" + ), + json!({ + "project_id": &linked_data.project_id, + "username": &credentials.offline_profile.name, + "server_id": &server_id, + }), + &state.api_semaphore, + &state.pool, + ) + .await; + + match result { + Ok(()) => { + info!( + "Tracked server play for '{project_id}' in analytics" + ) + } + Err(err) => { + warn!("Failed to report server play: {err:?}") + } + } + } + Ok(resp) => warn!( + "Failed to join Mojang session server: HTTP {}", + resp.status() + ), + Err(err) => { + warn!("Failed to join Mojang session server: {err:?}") } - Err(err) => warn!("Failed to report server play: {err:?}"), } } }