Skip to content

Commit

Permalink
add deterministic test via OrderedGetStore
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 27, 2025
1 parent f6f5729 commit df7a819
Showing 1 changed file with 150 additions and 36 deletions.
186 changes: 150 additions & 36 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
};

const DEFAULT_BUFFER_SIZE: usize = 1000;
const DEFAULT_BATCH_SIZE: usize = 1024 * 128;
const DEFAULT_BATCH_SIZE: usize = 10_000;

#[derive(Debug)]
pub struct DefaultJsonHandler<E: TaskExecutor> {
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<E: TaskExecutor> DefaultJsonHandler<E> {

/// Set the number of rows to read per batch during [Self::parse_json()].
///
/// Defaults to 128kB.
/// Defaults to 10_000 rows.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
Expand Down Expand Up @@ -238,11 +238,10 @@ impl JsonOpener {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::task::Waker;

use crate::actions::get_log_schema;
use crate::arrow::array::{AsArray, RecordBatch, StringArray};
Expand All @@ -253,10 +252,11 @@ mod tests {
};
use futures::future;
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOpts,
PutOptions, PutPayload, PutResult, Result,
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
};

// TODO: should just use the one from test_utils, but running into dependency issues
Expand All @@ -268,30 +268,73 @@ mod tests {

use super::*;

/// Store wrapper that wraps an inner store to purposefully delay GET requests of certain keys.
/// Store wrapper that wraps an inner store to guarantee the ordering of GET requests.
///
/// WARN: Does not handle duplicate keys, and will fail on duplicate requests of the same key.
///
// TODO(zach): we can handle duplicate requests if we retain the ordering of the keys track
// that all of the keys prior to the one requested have been resolved.
#[derive(Debug)]
struct SlowGetStore<T> {
struct OrderedGetStore<T: ObjectStore> {
// The ObjectStore we are wrapping
inner: T,
wait_keys: Arc<Mutex<HashMap<Path, Duration>>>,
// Queue of paths in order which they will resolve
ordered_keys: Arc<Mutex<VecDeque<Path>>>,
// Map of paths to wakers for pending get requests
wakers: Arc<Mutex<HashMap<Path, Vec<Waker>>>>,
}

impl<T> SlowGetStore<T> {
fn new(inner: T, wait_keys: HashMap<Path, Duration>) -> Self {
impl<T: ObjectStore> OrderedGetStore<T> {
fn new(inner: T, ordered_keys: impl Into<VecDeque<Path>>) -> Self {
let ordered_keys = ordered_keys.into();
let mut seen = HashSet::new();
for key in ordered_keys.iter() {
if !seen.insert(key) {
panic!("Duplicate key in OrderedGetStore: {}", key);
}
}

Self {
inner,
wait_keys: Arc::new(Mutex::new(wait_keys)),
ordered_keys: Arc::new(Mutex::new(ordered_keys)),
wakers: Arc::new(Mutex::new(HashMap::new())),
}
}

// Register a waker for a specific path
fn register_waker(&self, path: &Path, waker: std::task::Waker) {
self.wakers
.lock()
.unwrap()
.entry(path.clone())
.or_insert_with(Vec::new)
.push(waker);
}

// Wake the wakers for the next path in order, if any
fn wake_next(&self) {
let ordered_keys = self.ordered_keys.lock().unwrap();
// get the next key
if let Some(next_key) = ordered_keys.front() {
let mut wakers = self.wakers.lock().unwrap();
// remove the wakers for the next key and wake them
if let Some(path_wakers) = wakers.remove(next_key) {
for waker in path_wakers {
waker.wake();
}
}
}
}
}

impl<T: ObjectStore> std::fmt::Display for SlowGetStore<T> {
impl<T: ObjectStore> std::fmt::Display for OrderedGetStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SlowGetStore({:?})", self.wait_keys)
write!(f, "OrderedGetStore({:?})", self.ordered_keys)
}
}

#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for SlowGetStore<T> {
impl<T: ObjectStore> ObjectStore for OrderedGetStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.inner.put(location, payload).await
}
Expand All @@ -317,17 +360,37 @@ mod tests {
self.inner.put_multipart_opts(location, opts).await
}

// A GET request is fulfilled by checking if the requested path is next in order:
// - if yes, remove the path from the queue and proceed with the GET request, then wake the
// next path in order
// - if no, register the waker and wait
async fn get(&self, location: &Path) -> Result<GetResult> {
let wait_time = {
let guard = self.wait_keys.lock().expect("lock key map");
guard.get(location).copied()
};

if let Some(duration) = wait_time {
tokio::time::sleep(duration).await;
}
// we implement a future which only resolves once the requested path is next in order
futures::future::poll_fn(move |cx| {
let mut ordered_keys = self.ordered_keys.lock().unwrap();
match ordered_keys.front() {
Some(key) if key == location => {
// this key is next: remove it and return Ready so we proceed to do the GET
ordered_keys.pop_front();
Poll::Ready(())
}
Some(_) => {
// this key isn't next: register our waker and return Pending
self.register_waker(&location, cx.waker().clone());
Poll::Pending
}
None => {
// empty: someone asked for a key not in the order
panic!("Key ordering not specified for {}", location);
}
}
})
.await;

self.inner.get(location).await
// after doing our GET request, wake the next path in order
let result = self.inner.get(location).await;
self.wake_next();
return result;
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -467,6 +530,59 @@ mod tests {
assert_eq!(data[0].num_rows(), 4);
}

// this test creates an OrderedGetStore with 1000 paths that resolve in REVERSE order. it
// spawns 1000 tasks to read the paths in natural order (0, 1, 2, ...) and checks that they
// complete in reverse.
#[tokio::test]
async fn test_ordered_get_store() {
let num_paths = 1000;
let paths: Vec<Path> = (0..num_paths)
.map(|i| Path::from(format!("/test/path{}", i)))
.collect();

let memory_store = InMemory::new();
for (i, path) in paths.iter().enumerate() {
memory_store
.put(path, Bytes::from(format!("content_{}", i)).into())
.await
.unwrap();
}

// Create ordered store with REVERSE order (999, 998, ...)
let rev_paths: VecDeque<Path> = paths.iter().rev().cloned().collect();
let ordered_store = Arc::new(OrderedGetStore::new(memory_store.fork(), rev_paths.clone()));

let (tx, rx) = std::sync::mpsc::channel();

// Spawn tasks to GET each path in the natural order (0, 1, 2, ...)
// They should complete in the REVERSE order (999, 998, ...) due to OrderedGetStore
let handles: Vec<_> = paths
.iter()
.cloned()
.map(|path| {
let store = ordered_store.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = store.get(&path).await.unwrap();
tx.send(path).unwrap();
})
})
.collect();

let _ = future::join_all(handles).await;
let mut completed = Vec::new();
while let Ok(path) = rx.try_recv() {
completed.push(path);
}

// Expected order is reversed (999, 998, ..., 1, 0)
assert_eq!(
completed,
rev_paths.into_iter().collect_vec(),
"Expected paths to complete in reverse order"
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_read_json_files_ordering() {
let paths = [
Expand All @@ -475,15 +591,13 @@ mod tests {
];
let paths = paths.map(|p| std::fs::canonicalize(PathBuf::from(p)).unwrap());

let path_string = paths[0].to_string_lossy().to_string();
let object_store_path = Path::from(path_string);
// for the first 000000.json, wait for 1 second
let key_map = (object_store_path, Duration::from_secs(1));

let store = Arc::new(SlowGetStore::new(
LocalFileSystem::new(),
vec![key_map].into_iter().collect(),
));
// object store will resolve GETs to paths in reverse: 0001.json, 0000.json
let reverse_paths = paths
.iter()
.rev()
.map(|p| Path::from(p.to_string_lossy().to_string()))
.collect::<VecDeque<_>>();
let store = Arc::new(OrderedGetStore::new(LocalFileSystem::new(), reverse_paths));

let file_futures: Vec<_> = paths
.iter()
Expand Down

0 comments on commit df7a819

Please sign in to comment.