Harden minecraft-server-play analytics (#5484)
* Harden minecraft-server-play analytics * Verify based on mc token * Fail for non-server projects * Nitpicks and factor out HTTP client * Allow passing old minecraft_uuid field for clients * Remove server play analytics test since it relies on auth against Minecraft API which I don't want to mock :( * Switch to using hasJoined for uuid validation * Fix formatting * Fix sessionserver status code * Ensure profile name and queried username matches * replace some wrap_request_errs with internal errs * add HTTP client into web::Data * short timeout on client-side session join query * further fixes * sqlx prepare * fix clippy --------- Co-authored-by: Creeperkatze <178587183+Creeperkatze@users.noreply.github.com> Co-authored-by: aecsocket <aecsocket@tutanota.com>
This commit is contained in:
@@ -218,6 +218,16 @@ pub async fn init_client_with_database(
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
client
|
||||
.query(&format!(
|
||||
"
|
||||
ALTER TABLE {database}.{MINECRAFT_SERVER_PLAYS} {cluster_line}
|
||||
ADD COLUMN IF NOT EXISTS ip IPv6 DEFAULT toIPv6('::')
|
||||
"
|
||||
))
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
client
|
||||
.query(&format!(
|
||||
"
|
||||
|
||||
@@ -187,6 +187,7 @@ impl ProjectBuilder {
|
||||
pub async fn insert(
|
||||
self,
|
||||
transaction: &mut PgTransaction<'_>,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<DBProjectId, DatabaseError> {
|
||||
let project_struct = DBProject {
|
||||
id: self.project_id,
|
||||
@@ -234,7 +235,7 @@ impl ProjectBuilder {
|
||||
|
||||
for mut version in self.initial_versions {
|
||||
version.project_id = self.project_id;
|
||||
version.insert(&mut *transaction).await?;
|
||||
version.insert(&mut *transaction, http).await?;
|
||||
}
|
||||
|
||||
LinkUrl::insert_many_projects(
|
||||
|
||||
@@ -135,6 +135,7 @@ impl VersionFileBuilder {
|
||||
self,
|
||||
version_id: DBVersionId,
|
||||
transaction: &mut PgTransaction<'_>,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<DBFileId, DatabaseError> {
|
||||
let file_id = generate_file_id(&mut *transaction).await?;
|
||||
|
||||
@@ -173,6 +174,7 @@ impl VersionFileBuilder {
|
||||
DelphiRunParameters {
|
||||
file_id: file_id.into(),
|
||||
},
|
||||
http,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -193,6 +195,7 @@ impl VersionBuilder {
|
||||
pub async fn insert(
|
||||
self,
|
||||
transaction: &mut PgTransaction<'_>,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<DBVersionId, DatabaseError> {
|
||||
let version = DBVersion {
|
||||
id: self.version_id,
|
||||
@@ -233,7 +236,7 @@ impl VersionBuilder {
|
||||
} = self;
|
||||
|
||||
for file in files {
|
||||
file.insert(version_id, transaction).await?;
|
||||
file.insert(version_id, transaction, http).await?;
|
||||
}
|
||||
|
||||
DependencyBuilder::insert_many(
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::queue::moderation::AutomatedModerationQueue;
|
||||
use crate::routes::internal::delphi::rescan::rescan_projects_in_queue;
|
||||
use crate::util::anrok;
|
||||
use crate::util::archon::ArchonClient;
|
||||
use crate::util::http::HttpClient;
|
||||
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
|
||||
use sync::friends::handle_pubsub;
|
||||
|
||||
@@ -69,6 +70,7 @@ pub struct LabrinthConfig {
|
||||
pub email_queue: web::Data<EmailQueue>,
|
||||
pub archon_client: web::Data<ArchonClient>,
|
||||
pub gotenberg_client: GotenbergClient,
|
||||
pub http_client: web::Data<HttpClient>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -103,10 +105,14 @@ pub fn app_setup(
|
||||
|
||||
let scheduler = scheduler::Scheduler::new();
|
||||
|
||||
let http_client = web::Data::new(HttpClient::new());
|
||||
{
|
||||
let pool_ref = pool.clone();
|
||||
let http_ref = http_client.clone();
|
||||
actix_rt::spawn(async move {
|
||||
if let Err(err) = rescan_projects_in_queue(&pool_ref).await {
|
||||
if let Err(err) =
|
||||
rescan_projects_in_queue(&pool_ref, &http_ref).await
|
||||
{
|
||||
warn!("Delphi rescan failed: {err:#}");
|
||||
}
|
||||
});
|
||||
@@ -303,6 +309,7 @@ pub fn app_setup(
|
||||
stripe_client,
|
||||
anrok_client,
|
||||
gotenberg_client,
|
||||
http_client,
|
||||
archon_client: web::Data::new(
|
||||
ArchonClient::from_env()
|
||||
.expect("ARCHON_URL and PYRO_API_KEY must be set"),
|
||||
@@ -333,6 +340,7 @@ pub fn app_config(
|
||||
.app_data(web::Data::new(labrinth_config.file_host.clone()))
|
||||
.app_data(web::Data::new(labrinth_config.search_config.clone()))
|
||||
.app_data(web::Data::new(labrinth_config.gotenberg_client.clone()))
|
||||
.app_data(labrinth_config.http_client.clone())
|
||||
.app_data(labrinth_config.session_queue.clone())
|
||||
.app_data(labrinth_config.payouts_queue.clone())
|
||||
.app_data(labrinth_config.email_queue.clone())
|
||||
|
||||
@@ -86,4 +86,5 @@ pub struct MinecraftServerPlay {
|
||||
pub project_id: u64,
|
||||
#[serde(with = "clickhouse::serde::uuid")]
|
||||
pub minecraft_uuid: Uuid,
|
||||
pub ip: Ipv6Addr,
|
||||
}
|
||||
|
||||
@@ -14,12 +14,15 @@ pub mod cache;
|
||||
|
||||
const DOWNLOADS_NAMESPACE: &str = "downloads";
|
||||
const VIEWS_NAMESPACE: &str = "views";
|
||||
const MINECRAFT_SERVER_PLAYS_NAMESPACE: &str = "minecraft_server_plays";
|
||||
const MINECRAFT_SERVER_PLAYS_EXPIRY: u64 = 86_400; // 24 hours
|
||||
const MINECRAFT_SERVER_PLAYS_LIMIT: u32 = 5;
|
||||
|
||||
pub struct AnalyticsQueue {
|
||||
views_queue: DashMap<(u64, u64), Vec<PageView>>,
|
||||
downloads_queue: DashMap<(u64, u64), Download>,
|
||||
playtime_queue: DashSet<Playtime>,
|
||||
minecraft_server_plays_queue: DashSet<MinecraftServerPlay>,
|
||||
minecraft_server_plays_queue: DashMap<(u128, u64), MinecraftServerPlay>,
|
||||
affiliate_code_clicks_queue: DashMap<(u64, u64), Vec<AffiliateCodeClick>>,
|
||||
}
|
||||
|
||||
@@ -36,7 +39,7 @@ impl AnalyticsQueue {
|
||||
views_queue: DashMap::with_capacity(1000),
|
||||
downloads_queue: DashMap::with_capacity(1000),
|
||||
playtime_queue: DashSet::with_capacity(1000),
|
||||
minecraft_server_plays_queue: DashSet::with_capacity(1000),
|
||||
minecraft_server_plays_queue: DashMap::with_capacity(1000),
|
||||
affiliate_code_clicks_queue: DashMap::with_capacity(1000),
|
||||
}
|
||||
}
|
||||
@@ -60,7 +63,8 @@ impl AnalyticsQueue {
|
||||
}
|
||||
|
||||
pub fn add_minecraft_server_play(&self, play: MinecraftServerPlay) {
|
||||
self.minecraft_server_plays_queue.insert(play);
|
||||
self.minecraft_server_plays_queue
|
||||
.insert((play.minecraft_uuid.as_u128(), play.project_id), play);
|
||||
}
|
||||
|
||||
pub fn add_affiliate_code_click(&self, click: AffiliateCodeClick) {
|
||||
@@ -118,11 +122,67 @@ impl AnalyticsQueue {
|
||||
}
|
||||
|
||||
if !minecraft_server_plays_queue.is_empty() {
|
||||
let mut plays_keys = Vec::new();
|
||||
let raw_plays = DashMap::new();
|
||||
|
||||
for (index, (key, play)) in
|
||||
minecraft_server_plays_queue.into_iter().enumerate()
|
||||
{
|
||||
plays_keys.push(key);
|
||||
raw_plays.insert(index, play);
|
||||
}
|
||||
|
||||
let mut redis =
|
||||
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
||||
|
||||
let results = cmd("MGET")
|
||||
.arg(
|
||||
plays_keys
|
||||
.iter()
|
||||
.map(|x| {
|
||||
format!(
|
||||
"{}:{}-{}",
|
||||
MINECRAFT_SERVER_PLAYS_NAMESPACE, x.0, x.1
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<Vec<Option<u32>>>(&mut redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut pipe = redis::pipe();
|
||||
for (idx, count) in results.into_iter().enumerate() {
|
||||
let key = &plays_keys[idx];
|
||||
|
||||
let new_count = if let Some(count) = count {
|
||||
if count >= MINECRAFT_SERVER_PLAYS_LIMIT {
|
||||
raw_plays.remove(&idx);
|
||||
continue;
|
||||
}
|
||||
count + 1
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
pipe.atomic().set_ex(
|
||||
format!(
|
||||
"{}:{}-{}",
|
||||
MINECRAFT_SERVER_PLAYS_NAMESPACE, key.0, key.1
|
||||
),
|
||||
new_count,
|
||||
MINECRAFT_SERVER_PLAYS_EXPIRY,
|
||||
);
|
||||
}
|
||||
pipe.query_async::<()>(&mut *redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut plays = client
|
||||
.insert::<MinecraftServerPlay>(MINECRAFT_SERVER_PLAYS)
|
||||
.await?;
|
||||
|
||||
for play in minecraft_server_plays_queue {
|
||||
for (_, play) in raw_plays {
|
||||
plays.write(&play).await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::auth::get_user_from_headers;
|
||||
use crate::database::PgPool;
|
||||
use crate::database::models::DBProject;
|
||||
use crate::database::redis::RedisPool;
|
||||
use crate::env::ENV;
|
||||
use crate::models::analytics::{MinecraftServerPlay, PageView, Playtime};
|
||||
@@ -9,8 +10,11 @@ use crate::queue::analytics::AnalyticsQueue;
|
||||
use crate::queue::session::AuthQueue;
|
||||
use crate::routes::ApiError;
|
||||
use crate::util::date::get_current_tenths_of_ms;
|
||||
use crate::util::error::Context;
|
||||
use crate::util::http::HttpClient;
|
||||
use actix_web::{HttpRequest, HttpResponse};
|
||||
use actix_web::{post, web};
|
||||
use eyre::eyre;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::net::Ipv4Addr;
|
||||
@@ -233,10 +237,18 @@ async fn playtime_ingest(
|
||||
Ok(HttpResponse::NoContent().finish())
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct MinecraftProfile {
|
||||
id: Uuid,
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct MinecraftJavaServerPlayInput {
|
||||
project_id: ProjectId,
|
||||
minecraft_uuid: Uuid,
|
||||
username: Option<String>,
|
||||
server_id: Option<String>,
|
||||
minecraft_uuid: Option<Uuid>,
|
||||
}
|
||||
|
||||
pub const MINECRAFT_SERVER_PLAYS: &str = "minecraft_server_plays";
|
||||
@@ -249,7 +261,8 @@ async fn minecraft_server_play_ingest(
|
||||
play_input: web::Json<MinecraftJavaServerPlayInput>,
|
||||
pool: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<(), ApiError> {
|
||||
let user = get_user_from_headers(
|
||||
&req,
|
||||
&**pool,
|
||||
@@ -262,14 +275,86 @@ async fn minecraft_server_play_ingest(
|
||||
.ok();
|
||||
|
||||
let project_id = play_input.project_id;
|
||||
|
||||
let project = DBProject::get(&project_id.to_string(), &**pool, &redis)
|
||||
.await?
|
||||
.ok_or(ApiError::NotFound)?;
|
||||
|
||||
if project.components.minecraft_server.is_none() {
|
||||
return Err(ApiError::Request(eyre!(
|
||||
"not a `minecraft_server` project"
|
||||
)));
|
||||
}
|
||||
|
||||
let minecraft_uuid = if let (Some(username), Some(server_id)) =
|
||||
(&play_input.username, &play_input.server_id)
|
||||
{
|
||||
let has_joined = http
|
||||
.get("https://sessionserver.mojang.com/session/minecraft/hasJoined")
|
||||
.query(&[
|
||||
("username", username.as_str()),
|
||||
("serverId", server_id.as_str()),
|
||||
])
|
||||
.send()
|
||||
.await
|
||||
.wrap_internal_err("failed to contact Mojang session server")?;
|
||||
|
||||
if has_joined.status() == reqwest::StatusCode::NO_CONTENT
|
||||
|| !has_joined.status().is_success()
|
||||
{
|
||||
return Err(ApiError::Request(eyre!(
|
||||
"Minecraft session verification failed"
|
||||
)));
|
||||
}
|
||||
|
||||
let profile = has_joined
|
||||
.json::<MinecraftProfile>()
|
||||
.await
|
||||
.wrap_internal_err("invalid Mojang session response")?;
|
||||
|
||||
if profile.name != *username {
|
||||
return Err(ApiError::Request(eyre!(
|
||||
"returned Mojang profile name does not match username"
|
||||
)));
|
||||
}
|
||||
|
||||
profile.id
|
||||
} else {
|
||||
play_input
|
||||
.minecraft_uuid
|
||||
.wrap_request_err("missing `minecraft_uuid`")?
|
||||
};
|
||||
|
||||
let conn_info = req.connection_info().peer_addr().map(|x| x.to_string());
|
||||
let headers = req
|
||||
.headers()
|
||||
.into_iter()
|
||||
.map(|(key, val)| {
|
||||
(
|
||||
key.to_string().to_lowercase(),
|
||||
val.to_str().unwrap_or_default().to_string(),
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<String, String>>();
|
||||
|
||||
let ip = crate::util::ip::convert_to_ip_v6(
|
||||
if let Some(header) = headers.get("cf-connecting-ip") {
|
||||
header
|
||||
} else {
|
||||
conn_info.as_deref().unwrap_or_default()
|
||||
},
|
||||
)
|
||||
.unwrap_or_else(|_| Ipv4Addr::new(127, 0, 0, 1).to_ipv6_mapped());
|
||||
|
||||
let row = MinecraftServerPlay {
|
||||
recorded: get_current_tenths_of_ms(),
|
||||
user_id: user.map(|u| u.id.0).unwrap_or(0),
|
||||
project_id: project_id.0,
|
||||
minecraft_uuid: play_input.minecraft_uuid,
|
||||
minecraft_uuid,
|
||||
ip,
|
||||
};
|
||||
|
||||
analytics_queue.add_minecraft_server_play(row);
|
||||
|
||||
Ok(HttpResponse::NoContent().finish())
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use std::{collections::HashMap, fmt::Write, sync::LazyLock, time::Instant};
|
||||
use std::{collections::HashMap, fmt::Write, time::Instant};
|
||||
|
||||
use crate::database::PgPool;
|
||||
use crate::env::ENV;
|
||||
use crate::{database::PgPool, util::http::HttpClient};
|
||||
use actix_web::{HttpRequest, HttpResponse, get, post, web};
|
||||
use chrono::{DateTime, Utc};
|
||||
use eyre::eyre;
|
||||
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::info;
|
||||
@@ -46,21 +45,6 @@ pub fn config(cfg: &mut web::ServiceConfig) {
|
||||
);
|
||||
}
|
||||
|
||||
static DELPHI_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(|| {
|
||||
reqwest::Client::builder()
|
||||
.default_headers({
|
||||
HeaderMap::from_iter([(
|
||||
USER_AGENT,
|
||||
HeaderValue::from_static(concat!(
|
||||
"Labrinth/",
|
||||
env!("COMPILATION_DATE")
|
||||
)),
|
||||
)])
|
||||
})
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
/// Type of [`DelphiReportIssueDetails::key`].
|
||||
///
|
||||
/// Delphi may provide `null` for the key, but we require a key for storing
|
||||
@@ -367,6 +351,7 @@ async fn ingest_report_deserialized(
|
||||
pub async fn run(
|
||||
exec: impl crate::database::Executor<'_, Database = sqlx::Postgres>,
|
||||
run_parameters: DelphiRunParameters,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
let file_data = sqlx::query!(
|
||||
r#"
|
||||
@@ -389,8 +374,7 @@ pub async fn run(
|
||||
run_parameters.file_id.0
|
||||
);
|
||||
|
||||
DELPHI_CLIENT
|
||||
.post(&ENV.DELPHI_URL)
|
||||
http.post(&ENV.DELPHI_URL)
|
||||
.json(&serde_json::json!({
|
||||
"url": file_data.url,
|
||||
"project_id": ProjectId(file_data.project_id.0 as u64),
|
||||
@@ -489,6 +473,7 @@ async fn _run(
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
run_parameters: web::Query<DelphiRunParameters>,
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
check_is_moderator_from_headers(
|
||||
&req,
|
||||
@@ -499,7 +484,7 @@ async fn _run(
|
||||
)
|
||||
.await?;
|
||||
|
||||
run(&**pool, run_parameters.into_inner()).await
|
||||
run(&**pool, run_parameters.into_inner(), &http).await
|
||||
}
|
||||
|
||||
#[get("version")]
|
||||
@@ -531,6 +516,7 @@ async fn issue_type_schema(
|
||||
pool: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
check_is_moderator_from_headers(
|
||||
&req,
|
||||
@@ -556,8 +542,7 @@ async fn issue_type_schema(
|
||||
cache_entry => Ok(HttpResponse::Ok().json(
|
||||
&cache_entry
|
||||
.insert((
|
||||
DELPHI_CLIENT
|
||||
.get(format!("{}/schema", ENV.DELPHI_URL))
|
||||
http.get(format!("{}/schema", ENV.DELPHI_URL))
|
||||
.send()
|
||||
.await
|
||||
.and_then(|res| res.error_for_status())
|
||||
|
||||
@@ -2,11 +2,14 @@ use eyre::{Result, WrapErr, eyre};
|
||||
use futures::future::try_join_all;
|
||||
use tracing::info;
|
||||
|
||||
use super::{DELPHI_CLIENT, DelphiRunParameters};
|
||||
use super::DelphiRunParameters;
|
||||
use crate::{database::PgPool, env::ENV, models::ids::FileId};
|
||||
|
||||
pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> {
|
||||
let delphi_version = fetch_delphi_version().await?;
|
||||
pub async fn rescan_projects_in_queue(
|
||||
pool: &PgPool,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<()> {
|
||||
let delphi_version = fetch_delphi_version(http).await?;
|
||||
let old_delphi_version = fetch_stored_delphi_version(pool).await?;
|
||||
|
||||
if old_delphi_version == Some(delphi_version) {
|
||||
@@ -44,7 +47,7 @@ pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> {
|
||||
.map(|file_id| FileId(file_id.cast_unsigned()));
|
||||
|
||||
try_join_all(file_ids.map(|file_id| async move {
|
||||
super::run(pool, DelphiRunParameters { file_id })
|
||||
super::run(pool, DelphiRunParameters { file_id }, http)
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
eyre!("failed to submit Delphi rescan for `{file_id:?}`")
|
||||
@@ -60,8 +63,8 @@ pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_delphi_version() -> Result<i32> {
|
||||
let response = DELPHI_CLIENT
|
||||
async fn fetch_delphi_version(http: &reqwest::Client) -> Result<i32> {
|
||||
let response = http
|
||||
.get(format!("{}/version", ENV.DELPHI_URL))
|
||||
.send()
|
||||
.await
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::queue::session::AuthQueue;
|
||||
use crate::routes::v3::project_creation::default_project_type;
|
||||
use crate::routes::v3::project_creation::{CreateError, NewGalleryItem};
|
||||
use crate::routes::{v2_reroute, v3};
|
||||
use crate::util::http::HttpClient;
|
||||
use actix_multipart::Multipart;
|
||||
use actix_web::web::Data;
|
||||
use actix_web::{HttpRequest, HttpResponse, post};
|
||||
@@ -141,6 +142,7 @@ pub async fn project_create(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
http: Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
// Convert V2 multipart payload to V3 multipart payload
|
||||
let payload = v2_reroute::alter_actix_multipart(
|
||||
@@ -252,6 +254,7 @@ pub async fn project_create(
|
||||
redis.clone(),
|
||||
file_host,
|
||||
session_queue,
|
||||
http,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::queue::session::AuthQueue;
|
||||
use crate::routes::v3::project_creation::CreateError;
|
||||
use crate::routes::v3::version_creation;
|
||||
use crate::routes::{v2_reroute, v3};
|
||||
use crate::util::http::HttpClient;
|
||||
use actix_multipart::Multipart;
|
||||
use actix_web::http::header::ContentDisposition;
|
||||
use actix_web::web::Data;
|
||||
@@ -83,6 +84,7 @@ pub async fn version_create(
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
moderation_queue: Data<AutomatedModerationQueue>,
|
||||
http: Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let payload = v2_reroute::alter_actix_multipart(
|
||||
payload,
|
||||
@@ -237,6 +239,7 @@ pub async fn version_create(
|
||||
file_host,
|
||||
session_queue,
|
||||
moderation_queue,
|
||||
http,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -286,6 +289,7 @@ pub async fn upload_file_to_version(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
// Returns NoContent, so no need to convert to V2
|
||||
let response = v3::version_creation::upload_file_to_version(
|
||||
@@ -296,6 +300,7 @@ pub async fn upload_file_to_version(
|
||||
redis.clone(),
|
||||
file_host,
|
||||
session_queue,
|
||||
http,
|
||||
)
|
||||
.await?;
|
||||
Ok(response)
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::models::v3::user_limits::UserLimits;
|
||||
use crate::queue::session::AuthQueue;
|
||||
use crate::search::indexing::IndexingError;
|
||||
use crate::util::guards::admin_key_guard;
|
||||
use crate::util::http::HttpClient;
|
||||
use crate::util::img::upload_image_optimized;
|
||||
use crate::util::routes::read_from_field;
|
||||
use crate::util::validate::validation_errors_to_string;
|
||||
@@ -300,6 +301,7 @@ pub async fn project_create(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
http: Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
project_create_internal(
|
||||
req,
|
||||
@@ -308,6 +310,7 @@ pub async fn project_create(
|
||||
redis,
|
||||
file_host,
|
||||
session_queue,
|
||||
http,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -319,6 +322,7 @@ pub async fn project_create_internal(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
http: Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut transaction = client.begin().await?;
|
||||
let mut uploaded_files = Vec::new();
|
||||
@@ -335,6 +339,7 @@ pub async fn project_create_internal(
|
||||
&client,
|
||||
&redis,
|
||||
&session_queue,
|
||||
&http,
|
||||
project_id,
|
||||
)
|
||||
.await;
|
||||
@@ -366,6 +371,7 @@ pub async fn project_create_with_id(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
http: Data<HttpClient>,
|
||||
path: web::Path<(ProjectId,)>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut transaction = client.begin().await?;
|
||||
@@ -382,6 +388,7 @@ pub async fn project_create_with_id(
|
||||
&client,
|
||||
&redis,
|
||||
&session_queue,
|
||||
&http,
|
||||
project_id,
|
||||
)
|
||||
.await;
|
||||
@@ -443,6 +450,7 @@ async fn project_create_inner(
|
||||
pool: &PgPool,
|
||||
redis: &RedisPool,
|
||||
session_queue: &AuthQueue,
|
||||
http: &reqwest::Client,
|
||||
project_id: ProjectId,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
// The currently logged in user
|
||||
@@ -907,7 +915,9 @@ async fn project_create_inner(
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
let id = project_builder_actual.insert(&mut *transaction).await?;
|
||||
let id = project_builder_actual
|
||||
.insert(&mut *transaction, http)
|
||||
.await?;
|
||||
DBUser::clear_project_cache(&[current_user.id.into()], redis).await?;
|
||||
|
||||
for image_id in project_create_data.uploaded_images {
|
||||
|
||||
@@ -29,7 +29,9 @@ use crate::{
|
||||
},
|
||||
queue::session::AuthQueue,
|
||||
routes::ApiError,
|
||||
util::{error::Context, validate::validation_errors_to_string},
|
||||
util::{
|
||||
error::Context, http::HttpClient, validate::validation_errors_to_string,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) {
|
||||
@@ -116,6 +118,7 @@ pub async fn create(
|
||||
db: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
http: web::Data<HttpClient>,
|
||||
web::Json(create): web::Json<ProjectCreate>,
|
||||
) -> Result<web::Json<ProjectId>, CreateError> {
|
||||
// check that the user can make a project
|
||||
@@ -302,13 +305,13 @@ pub async fn create(
|
||||
};
|
||||
|
||||
project_builder
|
||||
.insert(&mut txn)
|
||||
.insert(&mut txn, &http)
|
||||
.await
|
||||
.wrap_internal_err("failed to insert project")?;
|
||||
|
||||
if let Some(version_builder) = version_builder {
|
||||
version_builder
|
||||
.insert(&mut txn)
|
||||
.insert(&mut txn, &http)
|
||||
.await
|
||||
.wrap_internal_err("failed to insert initial version")?;
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::models::projects::{DependencyType, ProjectStatus, skip_nulls};
|
||||
use crate::models::teams::ProjectPermissions;
|
||||
use crate::queue::moderation::AutomatedModerationQueue;
|
||||
use crate::queue::session::AuthQueue;
|
||||
use crate::util::http::HttpClient;
|
||||
use crate::util::routes::read_from_field;
|
||||
use crate::util::validate::validation_errors_to_string;
|
||||
use crate::validate::{ValidationResult, validate_file};
|
||||
@@ -112,6 +113,7 @@ pub async fn version_create(
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: Data<AuthQueue>,
|
||||
moderation_queue: web::Data<AutomatedModerationQueue>,
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut transaction = client.begin().await?;
|
||||
let mut uploaded_files = Vec::new();
|
||||
@@ -126,6 +128,7 @@ pub async fn version_create(
|
||||
&client,
|
||||
&session_queue,
|
||||
&moderation_queue,
|
||||
&http,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -159,6 +162,7 @@ async fn version_create_inner(
|
||||
pool: &PgPool,
|
||||
session_queue: &AuthQueue,
|
||||
moderation_queue: &AutomatedModerationQueue,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut initial_version_data = None;
|
||||
let mut version_builder = None;
|
||||
@@ -480,7 +484,7 @@ async fn version_create_inner(
|
||||
};
|
||||
|
||||
let project_id = builder.project_id;
|
||||
builder.insert(transaction).await?;
|
||||
builder.insert(transaction, http).await?;
|
||||
|
||||
for image_id in version_data.uploaded_images {
|
||||
if let Some(db_image) =
|
||||
@@ -542,6 +546,7 @@ pub async fn upload_file_to_version(
|
||||
redis: Data<RedisPool>,
|
||||
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
http: web::Data<HttpClient>,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut transaction = client.begin().await?;
|
||||
let mut uploaded_files = Vec::new();
|
||||
@@ -558,6 +563,7 @@ pub async fn upload_file_to_version(
|
||||
&mut uploaded_files,
|
||||
version_id,
|
||||
&session_queue,
|
||||
&http,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -591,6 +597,7 @@ async fn upload_file_to_version_inner(
|
||||
uploaded_files: &mut Vec<UploadedFile>,
|
||||
version_id: models::DBVersionId,
|
||||
session_queue: &AuthQueue,
|
||||
http: &reqwest::Client,
|
||||
) -> Result<HttpResponse, CreateError> {
|
||||
let mut initial_file_data: Option<InitialFileData> = None;
|
||||
let mut file_builders: Vec<VersionFileBuilder> = Vec::new();
|
||||
@@ -774,7 +781,7 @@ async fn upload_file_to_version_inner(
|
||||
));
|
||||
} else {
|
||||
for file in file_builders {
|
||||
file.insert(version_id, &mut *transaction).await?;
|
||||
file.insert(version_id, &mut *transaction, http).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
32
apps/labrinth/src/util/http.rs
Normal file
32
apps/labrinth/src/util/http.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use derive_more::Deref;
|
||||
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
|
||||
|
||||
/// Generic HTTP client used for anywhere you need to send an HTTP request, and
|
||||
/// do not care what headers or other parameters are used.
|
||||
#[derive(Debug, Clone, Deref)]
|
||||
pub struct HttpClient(pub reqwest::Client);
|
||||
|
||||
impl HttpClient {
|
||||
pub fn new() -> Self {
|
||||
let client = reqwest::Client::builder()
|
||||
.default_headers(HeaderMap::from_iter([(
|
||||
USER_AGENT,
|
||||
HeaderValue::from_static(concat!(
|
||||
"Labrinth/",
|
||||
env!("COMPILATION_DATE")
|
||||
)),
|
||||
)]))
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap();
|
||||
Self(client)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for HttpClient {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ pub mod error;
|
||||
pub mod ext;
|
||||
pub mod gotenberg;
|
||||
pub mod guards;
|
||||
pub mod http;
|
||||
pub mod img;
|
||||
pub mod ip;
|
||||
pub mod ratelimit;
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
use actix_http::StatusCode;
|
||||
use actix_web::test;
|
||||
use ariadne::ids::base62_impl::parse_base62;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use common::permissions::PermissionsTest;
|
||||
use common::permissions::PermissionsTestContext;
|
||||
use common::{
|
||||
api_common::Api,
|
||||
api_v3::ApiV3,
|
||||
database::*,
|
||||
environment::{TestEnvironment, with_test_environment},
|
||||
@@ -248,26 +245,3 @@ pub async fn permissions_analytics_revenue() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
pub async fn analytics_minecraft_server_play_ingest() {
|
||||
with_test_environment(
|
||||
None,
|
||||
|test_env: TestEnvironment<ApiV3>| async move {
|
||||
let api = &test_env.api;
|
||||
let project_id = test_env.dummy.project_alpha.project_id.clone();
|
||||
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/analytics/minecraft-server-play")
|
||||
.append_header(("Authorization", USER_USER_PAT.unwrap()))
|
||||
.set_json(serde_json::json!({
|
||||
"project_id": project_id,
|
||||
"minecraft_uuid": "12345678-1234-5678-1234-567812345678"
|
||||
}))
|
||||
.to_request();
|
||||
let resp = api.call(req).await;
|
||||
assert_status!(&resp, StatusCode::NO_CONTENT);
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use serde_json::json;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::data::Settings;
|
||||
use crate::server_address::ServerAddress;
|
||||
@@ -739,25 +740,54 @@ async fn run_credentials(
|
||||
if let Some(linked_data) = &profile.linked_data {
|
||||
let project_id = &linked_data.project_id;
|
||||
if !project_id.trim().is_empty() {
|
||||
let result = fetch::post_json(
|
||||
concat!(
|
||||
env!("MODRINTH_API_BASE_URL"),
|
||||
"analytics/minecraft-server-play"
|
||||
),
|
||||
json!({
|
||||
"project_id": &linked_data.project_id,
|
||||
"minecraft_uuid": credentials.offline_profile.id,
|
||||
}),
|
||||
&state.api_semaphore,
|
||||
&state.pool,
|
||||
)
|
||||
.await;
|
||||
let server_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!("Tracked server play for '{project_id}' in analytics")
|
||||
let join_result = fetch::REQWEST_CLIENT
|
||||
.post("https://sessionserver.mojang.com/session/minecraft/join")
|
||||
.json(&json!({
|
||||
"accessToken": &credentials.access_token,
|
||||
"selectedProfile": credentials.offline_profile.id.simple().to_string(),
|
||||
"serverId": &server_id,
|
||||
}))
|
||||
.timeout(Duration::from_secs(5))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match join_result {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let result = fetch::post_json(
|
||||
concat!(
|
||||
env!("MODRINTH_API_BASE_URL"),
|
||||
"analytics/minecraft-server-play"
|
||||
),
|
||||
json!({
|
||||
"project_id": &linked_data.project_id,
|
||||
"username": &credentials.offline_profile.name,
|
||||
"server_id": &server_id,
|
||||
}),
|
||||
&state.api_semaphore,
|
||||
&state.pool,
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
"Tracked server play for '{project_id}' in analytics"
|
||||
)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Failed to report server play: {err:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(resp) => warn!(
|
||||
"Failed to join Mojang session server: HTTP {}",
|
||||
resp.status()
|
||||
),
|
||||
Err(err) => {
|
||||
warn!("Failed to join Mojang session server: {err:?}")
|
||||
}
|
||||
Err(err) => warn!("Failed to report server play: {err:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user