Skip to content

Commit

Permalink
feat: the first demo for export
Browse files Browse the repository at this point in the history
  • Loading branch information
7086cmd committed May 3, 2024
1 parent 256dea6 commit a2a235a
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 151 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ socketioxide = { version = "0.12.0", features = [
"extensions",
"tracing",
] }
tempfile = "3.10.1"
tokio = { version = "1.37.0", features = ["full"] }
tower = "0.4.13"
tower-http = "0.5.2"
tower-http = { version = "0.5.2", features = ["cors"] }
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = [
"tracing",
Expand All @@ -53,7 +54,7 @@ tracing-subscriber = { version = "0.3.18", features = [
"json",
"regex",
] }
uuid = "1.8.0"
uuid = { version = "1.8.0", features = ["v1", "v4", "serde"] }
xlsxwriter = "0.6.0"
zerocopy = "0.7.32"

Expand Down
Binary file added public/.DS_Store
Binary file not shown.
Binary file added public/exports/.DS_Store
Binary file not shown.
120 changes: 0 additions & 120 deletions src/calc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,121 +1 @@
use crate::{
models::{activities::Activity, users::User},
routers::users::time::UserActivityTime,
};
use bson::{doc, from_document};
use futures::stream::TryStreamExt;
use mongodb::{Collection, Database};
use polars::{df, frame::DataFrame, prelude::NamedFrom, series::Series};
use std::sync::Arc;
use tokio::sync::Mutex;

async fn export(db: Arc<Mutex<Database>>) -> Result<DataFrame, String> {
let db = db.lock().await;
let mut df = df!(
"_id" => &["".to_string()],
"id" => &["0".to_string()],
"name" => &["Example".to_string()],
"class" => &["".to_string()],
"on_campus" => &[0.0],
"off_campus" => &[0.0],
"social_practice" => &[0.0],
"total" => &[0.0]
)
.unwrap();

let users_collection: Collection<User> = db.collection("users");
let activities_collection: Collection<Activity> = db.collection("activities");

let mut users = users_collection.find(doc! {}, None).await.unwrap();

while let Some(doc) = users.try_next().await.unwrap() {
let pipeline = vec![
doc! {
"$match": {
"$or": [
{ "members._id": doc._id.clone() },
{ "members._id": doc._id.to_hex() }
]
}
},
doc! {
"$unwind": "$members"
},
doc! {
"$match": {
"$or": [
{ "members._id": doc._id.clone() },
{ "members._id": doc._id.to_hex() }
]
}
},
doc! {
"$group": {
"_id": "$members.mode",
"totalDuration": { "$sum": "$members.duration" }
}
},
doc! {
"$group": {
"_id": null,
"on_campus": {
"$sum": {
"$cond": [{ "$eq": ["$_id", "on-campus"] }, "$totalDuration", 0.0]
}
},
"off_campus": {
"$sum": {
"$cond": [{ "$eq": ["$_id", "off-campus"] }, "$totalDuration", 0.0]
}
},
"social_practice": {
"$sum": {
"$cond": [{ "$eq": ["$_id", "social-practice"] }, "$totalDuration", 0.0]
}
},
"total": { "$sum": "$totalDuration" }
}
},
doc! {
"$project": {
"_id": 0,
"on_campus": 1,
"off_campus": 1,
"social_practice": 1,
"total": 1
}
},
];
let cursor = activities_collection.aggregate(pipeline, None).await;
if let Err(_) = cursor {
return Err("Failed to get cursor".to_string());
}
let mut cursor = cursor.unwrap();
let result = cursor.try_next().await;
if let Err(_) = result {
return Err("Failed to get result".to_string());
}
let result = result.unwrap();
if let None = result {
return Err("Failed to get result".to_string());
}
let result = result.unwrap();
let result: UserActivityTime = from_document(result).unwrap();
let extend = DataFrame::new(vec![
Series::new("_id", vec![doc._id.clone().to_hex()]),
Series::new("id", vec![doc.id.clone()]),
Series::new("name", vec![doc.name.clone()]),
Series::new("class", vec!["".to_string()]),
Series::new("on_campus", vec![result.on_campus]),
Series::new("off_campus", vec![result.off_campus]),
Series::new("social_practice", vec![result.social_practice]),
Series::new("total", vec![result.total]),
]);
if let Err(_) = extend {
return Err("Failed to create DataFrame".to_string());
}
let extend = extend.unwrap();
df.extend(&extend).unwrap();
}
Ok(df)
}
20 changes: 18 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ mod models;
mod routers;
mod tests;
mod utils;
use crate::models::exports::ExportState;
use axum::{
http::Method,
routing::{delete, get, post, put},
Extension, Router,
};
Expand All @@ -16,8 +18,9 @@ use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tower_http::cors::{Any, CorsLayer};

fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
socket.emit("auth", data).ok();
Expand Down Expand Up @@ -45,6 +48,8 @@ async fn main() {
.await
.expect("Failed to create client");

let shared_export_state = Arc::new(Mutex::new(HashMap::new()) as ExportState);

let shared_client = Arc::new(Mutex::new(client));

let (_, io) = SocketIo::new_layer();
Expand Down Expand Up @@ -102,7 +107,18 @@ async fn main() {
"/user/:user_id/time",
get(routers::users::time::calculate_user_activity_time),
)
.layer(Extension(shared_client.clone()));
.route("/export", post(routers::exports::export_activity_times))
.route(
"/export/:task_id",
get(routers::exports::query_export_status),
)
.layer(Extension(shared_client.clone()))
.layer(Extension(shared_export_state.clone()))
.layer(
CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
.allow_origin(Any),
);

// Run the server
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
Expand Down
41 changes: 41 additions & 0 deletions src/models/exports.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::Mutex;
use uuid::Uuid;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum ExportFormat {
CSV,
JSON,
Excel,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct ExportActivityTimesOptions {
pub start: u64, // Unix timestamp
pub end: u64, // Unix timestamp
pub format: ExportFormat,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum TaskStatus {
Pending,
Processing,
Done,
Error,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Task {
pub time: u64, // Unix timestamp
pub actioner: ObjectId,
pub options: ExportActivityTimesOptions,
pub status: TaskStatus,
pub result: Option<String>,
pub percent: Option<f64>,
}

pub type ExportState = Mutex<HashMap<Uuid, Task>>;
1 change: 1 addition & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod activities;
pub mod exports;
pub mod groups;
pub mod notifications;
pub mod response;
Expand Down
Loading

0 comments on commit a2a235a

Please sign in to comment.