From 80379f2896e39614464cf6178bf8d75ffa41d9f4 Mon Sep 17 00:00:00 2001 From: gudaoxuri Date: Tue, 7 May 2024 22:41:12 +0800 Subject: [PATCH] Improve task status processing. --- backend/basic/src/process/task_processor.rs | 58 ++++++++++++++------- backend/basic/tests/test_task_processor.rs | 49 +++++++++++++++++ 2 files changed, 89 insertions(+), 18 deletions(-) create mode 100644 backend/basic/tests/test_task_processor.rs diff --git a/backend/basic/src/process/task_processor.rs b/backend/basic/src/process/task_processor.rs index 446194aeb..00ded7a1e 100644 --- a/backend/basic/src/process/task_processor.rs +++ b/backend/basic/src/process/task_processor.rs @@ -40,34 +40,56 @@ impl TaskProcessor { /// 初始化异步任务状态 pub async fn init_status(cache_key: &str, task_id: Option, cache_client: &TardisCacheClient) -> TardisResult { let task_id = task_id.unwrap_or(Local::now().timestamp_nanos_opt().expect("maybe in 23rd century") as u64); - // u32::MAX * u32::MAX + u32::MAX - 1 - if task_id > 18446744069414584319 { - return Err(TardisError::bad_request("task id is too large", "400-task-id-too-large")); - } - // 使用bitmap存储以减少内存占用 - cache_client.setbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize, false).await?; - cache_client.setbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize, false).await?; + Self::set_status(cache_key, task_id, false, cache_client).await?; Ok(task_id) } - /// Check the status of the asynchronous task (whether it is completed) - /// - /// 检查异步任务状态(是否完成) - pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult { - let result1 = cache_client.getbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize).await?; - let result2 = cache_client.getbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize).await?; - Ok(result1 && result2) - } - /// Set the status of the asynchronous task (whether it is completed) /// /// 设置异步任务状态(是否完成) pub async fn set_status(cache_key: &str, task_id: u64, status: bool, cache_client: &TardisCacheClient) -> TardisResult<()> { - cache_client.setbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize, status).await?; - cache_client.setbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize, status).await?; + if task_id <= u32::MAX as u64 { + cache_client.setbit(&format!("{cache_key}:1"), task_id as usize, status).await?; + } else if task_id > 18446744069414584319 { + // u32::MAX * u32::MAX + u32::MAX - 1 + cache_client.setbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize, status).await?; + } else { + let _: String = cache_client + .script( + r#" + redis.call('SETBIT', KEYS[1]..':3', ARGV[1], ARGV[3]) + redis.call('SETBIT', KEYS[1]..':4', ARGV[2], ARGV[3]) + return 'OK' + "#, + ) + .key(cache_key) + .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64, if status { 1 } else { 0 }]) + .invoke() + .await?; + } Ok(()) } + /// Check the status of the asynchronous task (whether it is completed) + /// + /// 检查异步任务状态(是否完成) + pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult { + if task_id <= u32::MAX as u64 { + Ok(cache_client.getbit(&format!("{cache_key}:1"), task_id as usize).await?) + } else if task_id > 18446744069414584319 { + // u32::MAX * u32::MAX + u32::MAX - 1 + Ok(cache_client.getbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize).await?) + } else { + let (r1, r2): (bool, bool) = cache_client + .script(r#"return {redis.call('GETBIT', KEYS[1]..':3', ARGV[1]),redis.call('GETBIT', KEYS[1]..':4', ARGV[2])}"#) + .key(cache_key) + .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64]) + .invoke() + .await?; + Ok(r1 && r2) + } + } + /// Set the status of the asynchronous task (whether it is completed) and send an event /// /// 设置异步任务状态(是否完成)并发送事件 diff --git a/backend/basic/tests/test_task_processor.rs b/backend/basic/tests/test_task_processor.rs new file mode 100644 index 000000000..443dc2f6a --- /dev/null +++ b/backend/basic/tests/test_task_processor.rs @@ -0,0 +1,49 @@ +use std::env; + +use bios_basic::process::task_processor::TaskProcessor; +use bios_basic::test::init_test_container; +use tardis::basic::result::TardisResult; +use tardis::{testcontainers, tokio, TardisFuns}; + +#[tokio::test] +async fn test_task_processor() -> TardisResult<()> { + env::set_var("RUST_LOG", "debug,test_iam_serv=trace,sqlx::query=off,sqlparser=off"); + let docker = testcontainers::clients::Cli::default(); + let _x = init_test_container::init(&docker, None).await?; + + let cache_client = TardisFuns::inst("".to_string(), None).cache(); + + TaskProcessor::set_status("test1", 1, true, &cache_client).await?; + assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?); + + TaskProcessor::set_status("test1", u32::MAX as u64, true, &cache_client).await?; + assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?); + + TaskProcessor::set_status("test1", u32::MAX as u64 + 1, true, &cache_client).await?; + assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?); + assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?); + + TaskProcessor::set_status("test1", u64::MAX, true, &cache_client).await?; + assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?); + + TaskProcessor::set_status("test1", 1, false, &cache_client).await?; + assert!(!TaskProcessor::check_status("test1", 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?); + assert!(TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?); + + assert!(!TaskProcessor::check_status("test2", u32::MAX as u64 + 1, &cache_client).await?); + + Ok(()) +}