Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TST]: fix stack overflow in test_blockfile_shuttle #3927

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 70 additions & 68 deletions rust/blockstore/src/arrow/concurrency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,80 +7,82 @@
use chroma_cache::new_cache_for_test;
use chroma_storage::{local::LocalStorage, Storage};
use rand::Rng;
use shuttle::{future, thread};
use shuttle::{future, scheduler::RandomScheduler, thread, Runner};

#[test]
fn test_blockfile_shuttle() {
shuttle::check_random(
|| {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
// NOTE(rescrv): I chose to use non-persistent caches here to maximize chance of a
// race condition outside the cache.
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let blockfile_provider = ArrowBlockfileProvider::new(
storage,
TEST_MAX_BLOCK_SIZE_BYTES,
block_cache,
sparse_index_cache,
);
let writer = future::block_on(
blockfile_provider.write::<&str, u32>(BlockfileWriterOptions::default()),
)
.unwrap();
let id = writer.id();
// Generate N datapoints and then have T threads write them to the blockfile
let range_min = 10;
let range_max = 10000;
let n = shuttle::rand::thread_rng().gen_range(range_min..range_max);
// Make the max threads the number of cores * 2
let max_threads = num_cpus::get() * 2;
let t = shuttle::rand::thread_rng().gen_range(2..max_threads);
let mut join_handles = Vec::with_capacity(t);
for i in 0..t {
let range_start = i * n / t;
let range_end = (i + 1) * n / t;
let writer = writer.clone();
let handle = thread::spawn(move || {
for j in range_start..range_end {
let key_string = format!("key{}", j);
future::block_on(async {
writer
.set::<&str, u32>("", key_string.as_str(), j as u32)
.await
.unwrap();
});
}
});
join_handles.push(handle);
}
let mut config = shuttle::Config::default();
config.stack_size = 1 * 1024 * 1024; // 1MB

Check failure on line 15 in rust/blockstore/src/arrow/concurrency_test.rs

View workflow job for this annotation

GitHub Actions / Lint

this operation has no effect

for handle in join_handles {
handle.join().unwrap();
}
let scheduler = RandomScheduler::new(100);
let runner = Runner::new(scheduler, config);

// commit the writer
future::block_on(async {
let flusher = writer.commit::<&str, u32>().await.unwrap();
flusher.flush::<&str, u32>().await.unwrap();
runner.run(|| {
let tmp_dir = tempfile::tempdir().unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below is all whitespace changes

let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
// NOTE(rescrv): I chose to use non-persistent caches here to maximize chance of a
// race condition outside the cache.
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let blockfile_provider = ArrowBlockfileProvider::new(
storage,
TEST_MAX_BLOCK_SIZE_BYTES,
block_cache,
sparse_index_cache,
);
let writer = future::block_on(
blockfile_provider.write::<&str, u32>(BlockfileWriterOptions::default()),
)
.unwrap();
let id = writer.id();
// Generate N datapoints and then have T threads write them to the blockfile
let range_min = 10;
let range_max = 10000;
let n = shuttle::rand::thread_rng().gen_range(range_min..range_max);
// Make the max threads the number of cores * 2
let max_threads = num_cpus::get() * 2;
let t = shuttle::rand::thread_rng().gen_range(2..max_threads);
let mut join_handles = Vec::with_capacity(t);
for i in 0..t {
let range_start = i * n / t;
let range_end = (i + 1) * n / t;
let writer = writer.clone();
let handle = thread::spawn(move || {
for j in range_start..range_end {
let key_string = format!("key{}", j);
future::block_on(async {
writer
.set::<&str, u32>("", key_string.as_str(), j as u32)
.await
.unwrap();
});
}
});
join_handles.push(handle);
}

let reader = future::block_on(async {
blockfile_provider.read::<&str, u32>(&id).await.unwrap()
});
// Read the data back
for i in 0..n {
let key_string = format!("key{}", i);
let value =
future::block_on(async { reader.get("", key_string.as_str()).await });
let value = value
.expect("Expect key to exist and there to be no error")
.expect("Key should have a value");
assert_eq!(value, i as u32);
}
},
100,
);
for handle in join_handles {
handle.join().unwrap();
}

// commit the writer
future::block_on(async {
let flusher = writer.commit::<&str, u32>().await.unwrap();
flusher.flush::<&str, u32>().await.unwrap();
});

let reader = future::block_on(async {
blockfile_provider.read::<&str, u32>(&id).await.unwrap()
});
// Read the data back
for i in 0..n {
let key_string = format!("key{}", i);
let value = future::block_on(async { reader.get("", key_string.as_str()).await });
let value = value
.expect("Expect key to exist and there to be no error")
.expect("Key should have a value");
assert_eq!(value, i as u32);
}
});
}
}
Loading