From 52eeddf636d1f27f744e9e97cb3f01ab3bd6a5a2 Mon Sep 17 00:00:00 2001 From: Peregrine Leveret Date: Tue, 28 Jan 2025 16:42:25 -0500 Subject: [PATCH] patch, check queue before decompress --- src/json_rescue_v5_load.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/json_rescue_v5_load.rs b/src/json_rescue_v5_load.rs index ed17a3c..6dfd4e6 100644 --- a/src/json_rescue_v5_load.rs +++ b/src/json_rescue_v5_load.rs @@ -35,11 +35,6 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) -> let mut unique_functions: Vec = vec![]; - if queue::are_all_completed(pool, tgz_filename).await? { - info!("Skipping, archive already loaded: {}", tgz_filename); - return Ok(0); - } - // TODO: the queue checks could be async, since many files are read for j in json_vec { let archive_id = j.file_name().unwrap().to_str().unwrap(); @@ -97,14 +92,26 @@ pub async fn rip_concurrent_limited( let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency let mut tasks = vec![]; - for (n, p) in tgz_list.into_iter().enumerate() { + for (n, tgz_path) in tgz_list.into_iter().enumerate() { let pool = pool.clone(); // Clone pool for each task + + let tgz_filename = tgz_path + .file_name() + .expect("could not find .tgz filename") + .to_str() + .unwrap(); + if queue::are_all_completed(&pool, tgz_filename).await? { + info!("skipping, archive already loaded: {}", tgz_filename); + continue; + } + let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task let task = tokio::spawn(async move { let _permit = semaphore.acquire().await; // Acquire semaphore permit info!("PROGRESS: {n}/{archives_count}"); - single_thread_decompress_extract(&p, &pool).await // Perform the task + + single_thread_decompress_extract(&tgz_path, &pool).await // Perform the task }); tasks.push(task);