* Harden minecraft-server-play analytics * Verify based on mc token * Fail for non-server projects * Nitpicks and factor out HTTP client * Allow passing old minecraft_uuid field for clients * Remove server play analytics test since it relies on auth against Minecraft API which I don't want to mock :( * Switch to using hasJoined for uuid validation * Fix formatting * Fix sessionserver status code * Ensure profile name and queried username matches * replace some wrap_request_errs with internal errs * add HTTP client into web::Data * short timeout on client-side session join query * further fixes * sqlx prepare * fix clippy --------- Co-authored-by: Creeperkatze <178587183+Creeperkatze@users.noreply.github.com> Co-authored-by: aecsocket <aecsocket@tutanota.com>
366 lines
12 KiB
Rust
366 lines
12 KiB
Rust
use crate::database::PgPool;
|
|
use crate::database::models::DatabaseError;
|
|
use crate::database::redis::RedisPool;
|
|
use crate::models::analytics::{
|
|
AffiliateCodeClick, Download, MinecraftServerPlay, PageView, Playtime,
|
|
};
|
|
use crate::routes::ApiError;
|
|
use crate::routes::analytics::MINECRAFT_SERVER_PLAYS;
|
|
use dashmap::{DashMap, DashSet};
|
|
use redis::cmd;
|
|
use std::collections::HashMap;
|
|
|
|
pub mod cache;
|
|
|
|
const DOWNLOADS_NAMESPACE: &str = "downloads";
|
|
const VIEWS_NAMESPACE: &str = "views";
|
|
const MINECRAFT_SERVER_PLAYS_NAMESPACE: &str = "minecraft_server_plays";
|
|
const MINECRAFT_SERVER_PLAYS_EXPIRY: u64 = 86_400; // 24 hours
|
|
const MINECRAFT_SERVER_PLAYS_LIMIT: u32 = 5;
|
|
|
|
pub struct AnalyticsQueue {
|
|
views_queue: DashMap<(u64, u64), Vec<PageView>>,
|
|
downloads_queue: DashMap<(u64, u64), Download>,
|
|
playtime_queue: DashSet<Playtime>,
|
|
minecraft_server_plays_queue: DashMap<(u128, u64), MinecraftServerPlay>,
|
|
affiliate_code_clicks_queue: DashMap<(u64, u64), Vec<AffiliateCodeClick>>,
|
|
}
|
|
|
|
impl Default for AnalyticsQueue {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
// Batches analytics data points + transactions every few minutes
|
|
impl AnalyticsQueue {
|
|
pub fn new() -> Self {
|
|
AnalyticsQueue {
|
|
views_queue: DashMap::with_capacity(1000),
|
|
downloads_queue: DashMap::with_capacity(1000),
|
|
playtime_queue: DashSet::with_capacity(1000),
|
|
minecraft_server_plays_queue: DashMap::with_capacity(1000),
|
|
affiliate_code_clicks_queue: DashMap::with_capacity(1000),
|
|
}
|
|
}
|
|
|
|
pub fn add_view(&self, page_view: PageView) {
|
|
let ip_stripped = crate::util::ip::strip_ip(page_view.ip);
|
|
|
|
self.views_queue
|
|
.entry((ip_stripped, page_view.project_id))
|
|
.or_default()
|
|
.push(page_view);
|
|
}
|
|
pub fn add_download(&self, download: Download) {
|
|
let ip_stripped = crate::util::ip::strip_ip(download.ip);
|
|
self.downloads_queue
|
|
.insert((ip_stripped, download.project_id), download);
|
|
}
|
|
|
|
pub fn add_playtime(&self, playtime: Playtime) {
|
|
self.playtime_queue.insert(playtime);
|
|
}
|
|
|
|
pub fn add_minecraft_server_play(&self, play: MinecraftServerPlay) {
|
|
self.minecraft_server_plays_queue
|
|
.insert((play.minecraft_uuid.as_u128(), play.project_id), play);
|
|
}
|
|
|
|
pub fn add_affiliate_code_click(&self, click: AffiliateCodeClick) {
|
|
self.affiliate_code_clicks_queue
|
|
.entry((click.user_id, click.affiliate_code_id))
|
|
.or_default()
|
|
.push(click);
|
|
}
|
|
|
|
pub async fn index(
|
|
&self,
|
|
client: clickhouse::Client,
|
|
redis: &RedisPool,
|
|
pool: &PgPool,
|
|
) -> Result<(), ApiError> {
|
|
let views_queue = self.views_queue.clone();
|
|
self.views_queue.clear();
|
|
|
|
let downloads_queue = self.downloads_queue.clone();
|
|
self.downloads_queue.clear();
|
|
|
|
let playtime_queue = self.playtime_queue.clone();
|
|
self.playtime_queue.clear();
|
|
|
|
let minecraft_server_plays_queue =
|
|
self.minecraft_server_plays_queue.clone();
|
|
self.minecraft_server_plays_queue.clear();
|
|
|
|
let affiliate_code_clicks_queue =
|
|
self.affiliate_code_clicks_queue.clone();
|
|
self.affiliate_code_clicks_queue.clear();
|
|
|
|
if !affiliate_code_clicks_queue.is_empty() {
|
|
let mut insert_clicks = client
|
|
.insert::<AffiliateCodeClick>("affiliate_code_clicks")
|
|
.await?;
|
|
|
|
for (_, click_vec) in affiliate_code_clicks_queue {
|
|
for click in click_vec {
|
|
insert_clicks.write(&click).await?;
|
|
}
|
|
}
|
|
|
|
insert_clicks.end().await?;
|
|
}
|
|
|
|
if !playtime_queue.is_empty() {
|
|
let mut playtimes = client.insert::<Playtime>("playtime").await?;
|
|
|
|
for playtime in playtime_queue {
|
|
playtimes.write(&playtime).await?;
|
|
}
|
|
|
|
playtimes.end().await?;
|
|
}
|
|
|
|
if !minecraft_server_plays_queue.is_empty() {
|
|
let mut plays_keys = Vec::new();
|
|
let raw_plays = DashMap::new();
|
|
|
|
for (index, (key, play)) in
|
|
minecraft_server_plays_queue.into_iter().enumerate()
|
|
{
|
|
plays_keys.push(key);
|
|
raw_plays.insert(index, play);
|
|
}
|
|
|
|
let mut redis =
|
|
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
|
|
|
let results = cmd("MGET")
|
|
.arg(
|
|
plays_keys
|
|
.iter()
|
|
.map(|x| {
|
|
format!(
|
|
"{}:{}-{}",
|
|
MINECRAFT_SERVER_PLAYS_NAMESPACE, x.0, x.1
|
|
)
|
|
})
|
|
.collect::<Vec<_>>(),
|
|
)
|
|
.query_async::<Vec<Option<u32>>>(&mut redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut pipe = redis::pipe();
|
|
for (idx, count) in results.into_iter().enumerate() {
|
|
let key = &plays_keys[idx];
|
|
|
|
let new_count = if let Some(count) = count {
|
|
if count >= MINECRAFT_SERVER_PLAYS_LIMIT {
|
|
raw_plays.remove(&idx);
|
|
continue;
|
|
}
|
|
count + 1
|
|
} else {
|
|
1
|
|
};
|
|
|
|
pipe.atomic().set_ex(
|
|
format!(
|
|
"{}:{}-{}",
|
|
MINECRAFT_SERVER_PLAYS_NAMESPACE, key.0, key.1
|
|
),
|
|
new_count,
|
|
MINECRAFT_SERVER_PLAYS_EXPIRY,
|
|
);
|
|
}
|
|
pipe.query_async::<()>(&mut *redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut plays = client
|
|
.insert::<MinecraftServerPlay>(MINECRAFT_SERVER_PLAYS)
|
|
.await?;
|
|
|
|
for (_, play) in raw_plays {
|
|
plays.write(&play).await?;
|
|
}
|
|
|
|
plays.end().await?;
|
|
}
|
|
|
|
if !views_queue.is_empty() {
|
|
let mut views_keys = Vec::new();
|
|
let mut raw_views = Vec::new();
|
|
|
|
for (key, views) in views_queue {
|
|
views_keys.push(key);
|
|
raw_views.push((views, true));
|
|
}
|
|
|
|
let mut redis =
|
|
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
|
|
|
let results = cmd("MGET")
|
|
.arg(
|
|
views_keys
|
|
.iter()
|
|
.map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1))
|
|
.collect::<Vec<_>>(),
|
|
)
|
|
.query_async::<Vec<Option<u32>>>(&mut redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut pipe = redis::pipe();
|
|
for (idx, count) in results.into_iter().enumerate() {
|
|
let key = &views_keys[idx];
|
|
|
|
let new_count =
|
|
if let Some((views, monetized)) = raw_views.get_mut(idx) {
|
|
if let Some(count) = count {
|
|
if count > 3 {
|
|
*monetized = false;
|
|
continue;
|
|
}
|
|
|
|
if (count + views.len() as u32) > 3 {
|
|
*monetized = false;
|
|
}
|
|
|
|
count + (views.len() as u32)
|
|
} else {
|
|
views.len() as u32
|
|
}
|
|
} else {
|
|
1
|
|
};
|
|
|
|
pipe.atomic().set_ex(
|
|
format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1),
|
|
new_count,
|
|
6 * 60 * 60,
|
|
);
|
|
}
|
|
pipe.query_async::<()>(&mut *redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut views = client.insert::<PageView>("views").await?;
|
|
|
|
for (all_views, monetized) in raw_views {
|
|
for (idx, mut view) in all_views.into_iter().enumerate() {
|
|
if idx != 0 || !monetized {
|
|
view.monetized = false;
|
|
}
|
|
|
|
views.write(&view).await?;
|
|
}
|
|
}
|
|
|
|
views.end().await?;
|
|
}
|
|
|
|
if !downloads_queue.is_empty() {
|
|
let mut downloads_keys = Vec::new();
|
|
let raw_downloads = DashMap::new();
|
|
|
|
for (index, (key, download)) in
|
|
downloads_queue.into_iter().enumerate()
|
|
{
|
|
downloads_keys.push(key);
|
|
raw_downloads.insert(index, download);
|
|
}
|
|
|
|
let mut redis =
|
|
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
|
|
|
let results = cmd("MGET")
|
|
.arg(
|
|
downloads_keys
|
|
.iter()
|
|
.map(|x| {
|
|
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1)
|
|
})
|
|
.collect::<Vec<_>>(),
|
|
)
|
|
.query_async::<Vec<Option<u32>>>(&mut redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut pipe = redis::pipe();
|
|
for (idx, count) in results.into_iter().enumerate() {
|
|
let key = &downloads_keys[idx];
|
|
|
|
let new_count = if let Some(count) = count {
|
|
if count > 5 {
|
|
raw_downloads.remove(&idx);
|
|
continue;
|
|
}
|
|
|
|
count + 1
|
|
} else {
|
|
1
|
|
};
|
|
|
|
pipe.atomic().set_ex(
|
|
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1),
|
|
new_count,
|
|
6 * 60 * 60,
|
|
);
|
|
}
|
|
pipe.query_async::<()>(&mut *redis)
|
|
.await
|
|
.map_err(DatabaseError::CacheError)?;
|
|
|
|
let mut transaction = pool.begin().await?;
|
|
let mut downloads = client.insert::<Download>("downloads").await?;
|
|
|
|
let mut version_downloads: HashMap<i64, i32> = HashMap::new();
|
|
let mut project_downloads: HashMap<i64, i32> = HashMap::new();
|
|
|
|
for (_, download) in raw_downloads {
|
|
*version_downloads
|
|
.entry(download.version_id as i64)
|
|
.or_default() += 1;
|
|
*project_downloads
|
|
.entry(download.project_id as i64)
|
|
.or_default() += 1;
|
|
|
|
downloads.write(&download).await?;
|
|
}
|
|
|
|
sqlx::query(
|
|
"
|
|
UPDATE versions v
|
|
SET downloads = v.downloads + x.amount
|
|
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
|
|
WHERE v.id = x.id
|
|
",
|
|
)
|
|
.bind(version_downloads.keys().copied().collect::<Vec<_>>())
|
|
.bind(version_downloads.values().copied().collect::<Vec<_>>())
|
|
.execute(&mut transaction)
|
|
.await?;
|
|
|
|
sqlx::query(
|
|
"
|
|
UPDATE mods m
|
|
SET downloads = m.downloads + x.amount
|
|
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
|
|
WHERE m.id = x.id
|
|
",
|
|
)
|
|
.bind(project_downloads.keys().copied().collect::<Vec<_>>())
|
|
.bind(project_downloads.values().copied().collect::<Vec<_>>())
|
|
.execute(&mut transaction)
|
|
.await?;
|
|
|
|
transaction.commit().await?;
|
|
downloads.end().await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|