Skip to content

Commit

Permalink
patch, check queue before decompress
Browse files Browse the repository at this point in the history
  • Loading branch information
Peregrine Leveret committed Jan 28, 2025
1 parent 9bf6d94 commit 52eeddf
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) ->

let mut unique_functions: Vec<String> = vec![];

if queue::are_all_completed(pool, tgz_filename).await? {
info!("Skipping, archive already loaded: {}", tgz_filename);
return Ok(0);
}

// TODO: the queue checks could be async, since many files are read
for j in json_vec {
let archive_id = j.file_name().unwrap().to_str().unwrap();
Expand Down Expand Up @@ -97,14 +92,26 @@ pub async fn rip_concurrent_limited(
let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency
let mut tasks = vec![];

for (n, p) in tgz_list.into_iter().enumerate() {
for (n, tgz_path) in tgz_list.into_iter().enumerate() {
let pool = pool.clone(); // Clone pool for each task

let tgz_filename = tgz_path
.file_name()
.expect("could not find .tgz filename")
.to_str()
.unwrap();
if queue::are_all_completed(&pool, tgz_filename).await? {
info!("skipping, archive already loaded: {}", tgz_filename);
continue;
}

let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
info!("PROGRESS: {n}/{archives_count}");
single_thread_decompress_extract(&p, &pool).await // Perform the task

single_thread_decompress_extract(&tgz_path, &pool).await // Perform the task
});

tasks.push(task);
Expand Down

0 comments on commit 52eeddf

Please sign in to comment.