Skip to content

Commit

Permalink
Improve task status processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
gudaoxuri committed May 7, 2024
1 parent 8eb7b4d commit 80379f2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 18 deletions.
58 changes: 40 additions & 18 deletions backend/basic/src/process/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,56 @@ impl TaskProcessor {
/// 初始化异步任务状态
pub async fn init_status(cache_key: &str, task_id: Option<u64>, cache_client: &TardisCacheClient) -> TardisResult<u64> {
let task_id = task_id.unwrap_or(Local::now().timestamp_nanos_opt().expect("maybe in 23rd century") as u64);
// u32::MAX * u32::MAX + u32::MAX - 1
if task_id > 18446744069414584319 {
return Err(TardisError::bad_request("task id is too large", "400-task-id-too-large"));
}
// 使用bitmap存储以减少内存占用
cache_client.setbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize, false).await?;
cache_client.setbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize, false).await?;
Self::set_status(cache_key, task_id, false, cache_client).await?;
Ok(task_id)
}

/// Check the status of the asynchronous task (whether it is completed)
///
/// 检查异步任务状态(是否完成)
pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<bool> {
let result1 = cache_client.getbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize).await?;
let result2 = cache_client.getbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize).await?;
Ok(result1 && result2)
}

/// Set the status of the asynchronous task (whether it is completed)
///
/// 设置异步任务状态(是否完成)
pub async fn set_status(cache_key: &str, task_id: u64, status: bool, cache_client: &TardisCacheClient) -> TardisResult<()> {
cache_client.setbit(&format!("{cache_key}:1"), (task_id / u32::MAX as u64) as usize, status).await?;
cache_client.setbit(&format!("{cache_key}:2"), (task_id % u32::MAX as u64) as usize, status).await?;
if task_id <= u32::MAX as u64 {
cache_client.setbit(&format!("{cache_key}:1"), task_id as usize, status).await?;
} else if task_id > 18446744069414584319 {
// u32::MAX * u32::MAX + u32::MAX - 1
cache_client.setbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize, status).await?;
} else {
let _: String = cache_client
.script(
r#"
redis.call('SETBIT', KEYS[1]..':3', ARGV[1], ARGV[3])
redis.call('SETBIT', KEYS[1]..':4', ARGV[2], ARGV[3])
return 'OK'
"#,
)
.key(cache_key)
.arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64, if status { 1 } else { 0 }])
.invoke()
.await?;
}
Ok(())
}

/// Check the status of the asynchronous task (whether it is completed)
///
/// 检查异步任务状态(是否完成)
pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<bool> {
if task_id <= u32::MAX as u64 {
Ok(cache_client.getbit(&format!("{cache_key}:1"), task_id as usize).await?)
} else if task_id > 18446744069414584319 {
// u32::MAX * u32::MAX + u32::MAX - 1
Ok(cache_client.getbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize).await?)
} else {
let (r1, r2): (bool, bool) = cache_client
.script(r#"return {redis.call('GETBIT', KEYS[1]..':3', ARGV[1]),redis.call('GETBIT', KEYS[1]..':4', ARGV[2])}"#)
.key(cache_key)
.arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64])
.invoke()
.await?;
Ok(r1 && r2)
}
}

/// Set the status of the asynchronous task (whether it is completed) and send an event
///
/// 设置异步任务状态(是否完成)并发送事件
Expand Down
49 changes: 49 additions & 0 deletions backend/basic/tests/test_task_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::env;

use bios_basic::process::task_processor::TaskProcessor;
use bios_basic::test::init_test_container;
use tardis::basic::result::TardisResult;
use tardis::{testcontainers, tokio, TardisFuns};

#[tokio::test]
async fn test_task_processor() -> TardisResult<()> {
env::set_var("RUST_LOG", "debug,test_iam_serv=trace,sqlx::query=off,sqlparser=off");
let docker = testcontainers::clients::Cli::default();
let _x = init_test_container::init(&docker, None).await?;

let cache_client = TardisFuns::inst("".to_string(), None).cache();

TaskProcessor::set_status("test1", 1, true, &cache_client).await?;
assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?);

TaskProcessor::set_status("test1", u32::MAX as u64, true, &cache_client).await?;
assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?);

TaskProcessor::set_status("test1", u32::MAX as u64 + 1, true, &cache_client).await?;
assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?);
assert!(!TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?);

TaskProcessor::set_status("test1", u64::MAX, true, &cache_client).await?;
assert!(TaskProcessor::check_status("test1", 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?);

TaskProcessor::set_status("test1", 1, false, &cache_client).await?;
assert!(!TaskProcessor::check_status("test1", 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u32::MAX as u64 + 1, &cache_client).await?);
assert!(TaskProcessor::check_status("test1", u64::MAX, &cache_client).await?);

assert!(!TaskProcessor::check_status("test2", u32::MAX as u64 + 1, &cache_client).await?);

Ok(())
}

0 comments on commit 80379f2

Please sign in to comment.