diff --git a/Cargo.lock b/Cargo.lock index 685253c..6429483 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -315,6 +315,12 @@ version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ae037714f313c1353189ead58ef9eec30a8e8dc101b2622d461418fd59e28a9" +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -4888,6 +4894,7 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ + "atomic", "getrandom 0.2.14", "serde", ] @@ -5361,6 +5368,7 @@ dependencies = [ "serde_json", "serde_qs 0.13.0", "socketioxide", + "tempfile", "tokio", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index 385a086..3392a74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,9 +41,10 @@ socketioxide = { version = "0.12.0", features = [ "extensions", "tracing", ] } +tempfile = "3.10.1" tokio = { version = "1.37.0", features = ["full"] } tower = "0.4.13" -tower-http = "0.5.2" +tower-http = { version = "0.5.2", features = ["cors"] } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = [ "tracing", @@ -53,7 +54,7 @@ tracing-subscriber = { version = "0.3.18", features = [ "json", "regex", ] } -uuid = "1.8.0" +uuid = { version = "1.8.0", features = ["v1", "v4", "serde"] } xlsxwriter = "0.6.0" zerocopy = "0.7.32" diff --git a/public/.DS_Store b/public/.DS_Store new file mode 100644 index 0000000..30723e8 Binary files /dev/null and b/public/.DS_Store differ diff --git a/public/exports/.DS_Store b/public/exports/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/public/exports/.DS_Store differ diff --git a/src/calc/mod.rs b/src/calc/mod.rs index d363162..8b13789 100644 --- a/src/calc/mod.rs +++ b/src/calc/mod.rs @@ -1,121 +1 @@ -use crate::{ - models::{activities::Activity, users::User}, - routers::users::time::UserActivityTime, -}; -use bson::{doc, from_document}; -use futures::stream::TryStreamExt; -use mongodb::{Collection, Database}; -use polars::{df, frame::DataFrame, prelude::NamedFrom, series::Series}; -use std::sync::Arc; -use tokio::sync::Mutex; -async fn export(db: Arc>) -> Result { - let db = db.lock().await; - let mut df = df!( - "_id" => &["".to_string()], - "id" => &["0".to_string()], - "name" => &["Example".to_string()], - "class" => &["".to_string()], - "on_campus" => &[0.0], - "off_campus" => &[0.0], - "social_practice" => &[0.0], - "total" => &[0.0] - ) - .unwrap(); - - let users_collection: Collection = db.collection("users"); - let activities_collection: Collection = db.collection("activities"); - - let mut users = users_collection.find(doc! {}, None).await.unwrap(); - - while let Some(doc) = users.try_next().await.unwrap() { - let pipeline = vec![ - doc! { - "$match": { - "$or": [ - { "members._id": doc._id.clone() }, - { "members._id": doc._id.to_hex() } - ] - } - }, - doc! { - "$unwind": "$members" - }, - doc! { - "$match": { - "$or": [ - { "members._id": doc._id.clone() }, - { "members._id": doc._id.to_hex() } - ] - } - }, - doc! { - "$group": { - "_id": "$members.mode", - "totalDuration": { "$sum": "$members.duration" } - } - }, - doc! { - "$group": { - "_id": null, - "on_campus": { - "$sum": { - "$cond": [{ "$eq": ["$_id", "on-campus"] }, "$totalDuration", 0.0] - } - }, - "off_campus": { - "$sum": { - "$cond": [{ "$eq": ["$_id", "off-campus"] }, "$totalDuration", 0.0] - } - }, - "social_practice": { - "$sum": { - "$cond": [{ "$eq": ["$_id", "social-practice"] }, "$totalDuration", 0.0] - } - }, - "total": { "$sum": "$totalDuration" } - } - }, - doc! { - "$project": { - "_id": 0, - "on_campus": 1, - "off_campus": 1, - "social_practice": 1, - "total": 1 - } - }, - ]; - let cursor = activities_collection.aggregate(pipeline, None).await; - if let Err(_) = cursor { - return Err("Failed to get cursor".to_string()); - } - let mut cursor = cursor.unwrap(); - let result = cursor.try_next().await; - if let Err(_) = result { - return Err("Failed to get result".to_string()); - } - let result = result.unwrap(); - if let None = result { - return Err("Failed to get result".to_string()); - } - let result = result.unwrap(); - let result: UserActivityTime = from_document(result).unwrap(); - let extend = DataFrame::new(vec![ - Series::new("_id", vec![doc._id.clone().to_hex()]), - Series::new("id", vec![doc.id.clone()]), - Series::new("name", vec![doc.name.clone()]), - Series::new("class", vec!["".to_string()]), - Series::new("on_campus", vec![result.on_campus]), - Series::new("off_campus", vec![result.off_campus]), - Series::new("social_practice", vec![result.social_practice]), - Series::new("total", vec![result.total]), - ]); - if let Err(_) = extend { - return Err("Failed to create DataFrame".to_string()); - } - let extend = extend.unwrap(); - df.extend(&extend).unwrap(); - } - Ok(df) -} diff --git a/src/main.rs b/src/main.rs index 5a1c671..e5d6353 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,9 @@ mod models; mod routers; mod tests; mod utils; +use crate::models::exports::ExportState; use axum::{ + http::Method, routing::{delete, get, post, put}, Extension, Router, }; @@ -16,8 +18,9 @@ use socketioxide::{ extract::{AckSender, Bin, Data, SocketRef}, SocketIo, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; +use tower_http::cors::{Any, CorsLayer}; fn on_connect(socket: SocketRef, Data(data): Data) { socket.emit("auth", data).ok(); @@ -45,6 +48,8 @@ async fn main() { .await .expect("Failed to create client"); + let shared_export_state = Arc::new(Mutex::new(HashMap::new()) as ExportState); + let shared_client = Arc::new(Mutex::new(client)); let (_, io) = SocketIo::new_layer(); @@ -102,7 +107,18 @@ async fn main() { "/user/:user_id/time", get(routers::users::time::calculate_user_activity_time), ) - .layer(Extension(shared_client.clone())); + .route("/export", post(routers::exports::export_activity_times)) + .route( + "/export/:task_id", + get(routers::exports::query_export_status), + ) + .layer(Extension(shared_client.clone())) + .layer(Extension(shared_export_state.clone())) + .layer( + CorsLayer::new() + .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]) + .allow_origin(Any), + ); // Run the server let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); diff --git a/src/models/exports.rs b/src/models/exports.rs new file mode 100644 index 0000000..d87f0a9 --- /dev/null +++ b/src/models/exports.rs @@ -0,0 +1,41 @@ +use bson::{doc, oid::ObjectId}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio::sync::Mutex; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +#[serde(rename_all = "kebab-case")] +pub enum ExportFormat { + CSV, + JSON, + Excel, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +pub struct ExportActivityTimesOptions { + pub start: u64, // Unix timestamp + pub end: u64, // Unix timestamp + pub format: ExportFormat, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +#[serde(rename_all = "kebab-case")] +pub enum TaskStatus { + Pending, + Processing, + Done, + Error, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Task { + pub time: u64, // Unix timestamp + pub actioner: ObjectId, + pub options: ExportActivityTimesOptions, + pub status: TaskStatus, + pub result: Option, + pub percent: Option, +} + +pub type ExportState = Mutex>; diff --git a/src/models/mod.rs b/src/models/mod.rs index 5659170..e1cd241 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,4 +1,5 @@ pub mod activities; +pub mod exports; pub mod groups; pub mod notifications; pub mod response; diff --git a/src/routers/exports/mod.rs b/src/routers/exports/mod.rs index c191847..9a4d21a 100644 --- a/src/routers/exports/mod.rs +++ b/src/routers/exports/mod.rs @@ -1,43 +1,167 @@ -use crate::{ - models::{groups::GroupPermission, response::create_error}, - utils::jwt::UserData, -}; use axum::{ - extract::Extension, + extract::{Extension, Path}, http::StatusCode, response::{IntoResponse, Json}, }; -use bson::doc; +use bson::oid::ObjectId; use mongodb::Database; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{fs, str::FromStr, sync::Arc, time::SystemTime}; +use tempfile::NamedTempFile; use tokio::sync::Mutex; +use uuid::Uuid; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] -#[serde(rename_all = "kebab-case")] -pub enum ExportFormat { - CSV, - JSON, - Excel, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] -pub struct ExportActivityTimesOptions { - pub start: u64, // Unix timestamp - pub end: u64, // Unix timestamp - pub format: ExportFormat, -} +use crate::{ + models::{ + exports::{ExportActivityTimesOptions, ExportState, Task, TaskStatus}, + groups::GroupPermission, + response::create_error, + }, + utils::{ + exports::{csv_to_excel, export_csv}, + jwt::UserData, + }, +}; pub async fn export_activity_times( Extension(db): Extension>>, + Extension(exporters): Extension>, user: UserData, Json(options): Json, ) -> impl IntoResponse { if !user.perms.contains(&GroupPermission::Admin) && !user.perms.contains(&GroupPermission::Inspector) { - return create_error(StatusCode::FORBIDDEN, "Permission denied".to_string()); + return create_error(StatusCode::FORBIDDEN, "Permission denied".to_string()) + .into_response(); + } + + let task_id = Uuid::new_v4(); + println!( + "Received task to export activity times, job ID: {}", + task_id + ); + + if let Err(e) = ObjectId::from_str(&user.id) { + return create_error(StatusCode::BAD_REQUEST, format!("Invalid user ID: {}", e)) + .into_response(); + } + let user_id = ObjectId::from_str(&user.id).unwrap(); + + println!("Starting to export excel by user {}", user_id); + + let mut tasks = exporters.lock().await; + let task = create_task(user_id, &options); + tasks.insert(task_id, task); + // Release the lock + drop(tasks); + + let _ = spawn_task(task_id, Arc::clone(&exporters), db).await; + + (axum::http::StatusCode::OK, Json(task_id.to_string())).into_response() +} + +pub async fn query_export_status( + Extension(exporters): Extension>, + Path(task_id): Path, +) -> impl IntoResponse { + let task_id = Uuid::parse_str(&task_id); + if let Err(_) = task_id { + return create_error(StatusCode::BAD_REQUEST, "Invalid task ID".to_string()) + .into_response(); + } + let task_id = task_id.unwrap(); + let tasks = exporters.lock().await; + let task = tasks.get(&task_id); + if let None = task { + return create_error(StatusCode::NOT_FOUND, "Task not found".to_string()).into_response(); + } + let task = task.unwrap(); + (axum::http::StatusCode::OK, Json(task)).into_response() +} + +fn create_task(user_id: ObjectId, options: &ExportActivityTimesOptions) -> Task { + Task { + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + status: TaskStatus::Pending, + options: options.clone(), + actioner: user_id, + result: None, + percent: Some(0.0), } +} + +async fn spawn_task(task_id: Uuid, exporters: Arc, db: Arc>) { + println!("Start to spawn task {}", task_id); + let _ = tokio::spawn(async move { + process_task(task_id, Arc::clone(&exporters), Arc::clone(&db)).await; + }) + .await; +} - (StatusCode::OK, Json("".to_string())) +async fn process_task(task_id: Uuid, exporters: Arc, db: Arc>) { + println!("Start to process task {}", task_id); + let mut tasks = exporters.lock().await; + let task = tasks.get_mut(&task_id).unwrap(); + task.status = TaskStatus::Processing; + println!("Task {} is processing", task_id); + let result = export_csv::export_to_dataframe(db).await; + if let Err(_) = result { + task.status = TaskStatus::Error; + task.result = None; + return; + } + let result = result.unwrap(); + let temp_csv = NamedTempFile::new(); + if let Err(_) = temp_csv { + task.status = TaskStatus::Error; + task.result = None; + return; + } + let temp_csv = temp_csv.unwrap(); + let temp_csv_name = temp_csv.path().to_str(); + if let None = temp_csv_name { + task.status = TaskStatus::Error; + task.result = None; + return; + } + let temp_csv_name = String::from(temp_csv_name.unwrap()); + let temp_csv = temp_csv.as_file(); + println!("Start to save to csv"); + let result = export_csv::save_to_csv(result, temp_csv).await; + if let Err(_) = result { + task.status = TaskStatus::Error; + task.result = None; + return; + } + println!("Start to convert to excel {}", temp_csv_name); + let temp_excel = NamedTempFile::new(); + if let Err(_) = temp_excel { + task.status = TaskStatus::Error; + task.result = None; + return; + } + let temp_excel = temp_excel.unwrap(); + let temp_excel_name = temp_excel.path().to_str(); + if let None = temp_excel_name { + task.status = TaskStatus::Error; + task.result = None; + return; + } + let temp_excel_name = String::from(temp_excel_name.unwrap()); + let result = csv_to_excel::to_excel(temp_csv_name.clone(), temp_excel_name.clone()); + if let Err(_) = result { + task.status = TaskStatus::Error; + task.result = None; + return; + } + task.status = TaskStatus::Done; + task.result = Some(temp_excel_name.clone()); + fs::copy( + temp_excel_name.clone().as_str(), + format!("public/exports/{}.xlsx", task_id), + ) + .unwrap(); } diff --git a/src/utils/format/convert.py b/src/utils/exports/convert.py similarity index 100% rename from src/utils/format/convert.py rename to src/utils/exports/convert.py diff --git a/src/utils/format/csv_to_excel.rs b/src/utils/exports/csv_to_excel.rs similarity index 63% rename from src/utils/format/csv_to_excel.rs rename to src/utils/exports/csv_to_excel.rs index 0c4f2be..713fd3e 100644 --- a/src/utils/format/csv_to_excel.rs +++ b/src/utils/exports/csv_to_excel.rs @@ -1,8 +1,13 @@ use pyo3::types::{PyAnyMethods, PyModule}; use pyo3::{Py, PyAny, PyResult, Python}; -pub fn to_excel() -> PyResult<()> { +pub fn to_excel(input: String, output: String) -> PyResult<()> { + println!("Start to convert to excel {} to {}", input, output); Python::with_gil(|py| { + println!( + "Start to convert to excel {} to {} with python.", + input, output + ); let bound = PyModule::from_code_bound( py, r#" @@ -19,7 +24,8 @@ def to_excel(input_path: str, output_path: str): )); } let bound: Py = bound.unwrap().getattr("to_excel")?.into(); - let _ = bound.call1(py, ("output.csv", "output.xlsx")); + let _ = bound.call1(py, (input.as_str(), output.as_str())); + println!("Converted to excel {} to {}", input, output); Ok(()) }) } diff --git a/src/utils/exports/export_csv.rs b/src/utils/exports/export_csv.rs new file mode 100644 index 0000000..f18fcd8 --- /dev/null +++ b/src/utils/exports/export_csv.rs @@ -0,0 +1,152 @@ +use crate::{ + models::{activities::Activity, users::User}, + routers::users::time::UserActivityTime, +}; +use bson::{doc, from_document}; +use futures::stream::TryStreamExt; +use mongodb::{Collection, Database}; +use polars::{ + df, + frame::DataFrame, + io::{csv::CsvWriter, SerWriter}, + prelude::NamedFrom, + series::Series, +}; +use std::{fs::File, sync::Arc}; +use tokio::sync::Mutex; + +pub async fn export_to_dataframe(db: Arc>) -> Result { + let db = db.lock().await; + let mut df = df!( + "_id" => &["".to_string()], + "id" => &["0".to_string()], + "name" => &["Example".to_string()], + "class" => &["".to_string()], + "on_campus" => &[0.0], + "off_campus" => &[0.0], + "social_practice" => &[0.0], + "total" => &[0.0] + ) + .unwrap(); + + println!("Start to export data"); + + let users_collection: Collection = db.collection("users"); + let activities_collection: Collection = db.collection("activities"); + + let mut users = users_collection.find(doc! {}, None).await.unwrap(); + + let mut count = 0; + + while let Some(doc) = users.try_next().await.unwrap() { + count += 1; + if count % 100 == 99 { + return Ok(df); + } + println!("Processing {}'s data", doc.name); + let pipeline = vec![ + doc! { + "$match": { + "$or": [ + { "members._id": doc._id.clone() }, + { "members._id": doc._id.to_hex() } + ] + } + }, + doc! { + "$unwind": "$members" + }, + doc! { + "$match": { + "$or": [ + { "members._id": doc._id.clone() }, + { "members._id": doc._id.to_hex() } + ] + } + }, + doc! { + "$group": { + "_id": "$members.mode", + "totalDuration": { "$sum": "$members.duration" } + } + }, + doc! { + "$group": { + "_id": null, + "on_campus": { + "$sum": { + "$cond": [{ "$eq": ["$_id", "on-campus"] }, "$totalDuration", 0.0] + } + }, + "off_campus": { + "$sum": { + "$cond": [{ "$eq": ["$_id", "off-campus"] }, "$totalDuration", 0.0] + } + }, + "social_practice": { + "$sum": { + "$cond": [{ "$eq": ["$_id", "social-practice"] }, "$totalDuration", 0.0] + } + }, + "total": { "$sum": "$totalDuration" } + } + }, + doc! { + "$project": { + "_id": 0, + "on_campus": 1, + "off_campus": 1, + "social_practice": 1, + "total": 1 + } + }, + ]; + let cursor = activities_collection.aggregate(pipeline, None).await; + println!("Got cursor"); + if let Err(_) = cursor { + return Err("Failed to get cursor".to_string()); + } + let mut cursor = cursor.unwrap(); + println!("Unwrapped cursor"); + let result = cursor.try_next().await; + if let Err(_) = result { + return Err("Failed to get result".to_string()); + } + println!("Unwrapped cursor"); + let result = result.unwrap(); + if let None = result { + continue; + } + println!("Unwrapped cursor"); + let result = result.unwrap(); + println!("Got result {:?}", result); + let result: UserActivityTime = from_document(result).unwrap(); + println!("Got result"); + let extend = DataFrame::new(vec![ + Series::new("_id", vec![doc._id.clone().to_hex()]), + Series::new("id", vec![doc.id.clone()]), + Series::new("name", vec![doc.name.clone()]), + Series::new("class", vec!["".to_string()]), + Series::new("on_campus", vec![result.on_campus]), + Series::new("off_campus", vec![result.off_campus]), + Series::new("social_practice", vec![result.social_practice]), + Series::new("total", vec![result.total]), + ]); + if let Err(_) = extend { + return Err("Failed to create DataFrame".to_string()); + } + println!("Extended {}'s data", doc.name); + let extend = extend.unwrap(); + df.extend(&extend).unwrap(); + } + Ok(df) +} + +pub async fn save_to_csv(mut df: DataFrame, mut target: &File) -> Result<(), String> { + let writer = CsvWriter::new(&mut target).finish(&mut df); + println!("Finished writing"); + if let Err(_) = writer { + return Err("Failed to write DataFrame".to_string()); + } + Ok(()) +} diff --git a/src/utils/format/mod.rs b/src/utils/exports/mod.rs similarity index 52% rename from src/utils/format/mod.rs rename to src/utils/exports/mod.rs index f86f15a..1ca89a4 100644 --- a/src/utils/format/mod.rs +++ b/src/utils/exports/mod.rs @@ -1 +1,2 @@ pub mod csv_to_excel; +pub mod export_csv; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 665e92e..5c79794 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,6 +1,6 @@ pub mod aes; pub mod config; -pub mod format; +pub mod exports; pub mod groups; pub mod jwt; pub mod rsa;