Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][wip] improve json reading with concurrent requests #704

Closed

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Feb 20, 2025

related: #595

note this is out of order due to spawn - instead I think i'll just do futures combinators

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Feb 20, 2025
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have badly understood how this async stuff works, but I left some comments anyway :P

Comment on lines +19 to +21
type JoinHandle<T>: Future<Output = Result<T, Self::JoinError>> + Send
where
T: Send;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think traits were allowed to provide associated types like this in stable rust... did something stabilize recently?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this work?

Suggested change
type JoinHandle<T>: Future<Output = Result<T, Self::JoinError>> + Send
where
T: Send;
type JoinHandle<T: Send>: Future<Output = Result<T, Self::JoinError>> + Send;

@@ -116,7 +116,13 @@ impl FileStream {
let (sender, receiver) = std::sync::mpsc::sync_channel(readahead);

let executor_for_block = task_executor.clone();
task_executor.spawn(async move {
// Note: we throw away the JoinHandle since we don't need to wait for it's return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar nit

Suggested change
// Note: we throw away the JoinHandle since we don't need to wait for it's return
// Note: we throw away the JoinHandle since we don't need to wait for its return

where
F: Future<Output = ()> + Send + 'static,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.send_future(Box::pin(task));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would compile with that semicolon blocking the now-required return value?

Comment on lines +79 to +82
self.rx
.recv()
.ok()
.map(|r| r.map(|rb| Box::new(ArrowEngineData::new(rb)) as Box<dyn EngineData>))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checking -- the only error recv ever returns is HUP, and so converting that to an iterator None result is the desired behavior?

Suggested change
self.rx
.recv()
.ok()
.map(|r| r.map(|rb| Box::new(ArrowEngineData::new(rb)) as Box<dyn EngineData>))
let r = self.rx.recv().ok()?; // HUP -> iterator finished
r.map(|rb| Box::new(ArrowEngineData::new(rb)) as _)

self.task_executor.block_on(async move {
let mut handles = Vec::with_capacity(len);

for file in files.into_iter() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for file in files.into_iter() {
for file in files {

https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator

}

// Wait for all tasks to complete.
join_all(handles).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do all tasks need to complete before we can return the iterator?
(prob need to clone the file list and task executor arc before launching tasks tho -- else will have borrow lifetime issues)

// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// println!("sleeping");
while let Some(result) = stream.next().await {
let _ = tx_clone.send(result);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing how this preserves order as required by log replay? It looks like the various tasks all race to append their stream chunks to the queue?


let f = file_opener.clone();
// println!("file: {:?}", file);
let handle = runtime.spawn(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this call causes the task to start running. Thus launching an open call for each file. But does the open make progress without somebody waiting on it, so that the first stream.next() call gets triggered? I would hope so.

Meanwhile, given the batch ordering issues (see comment below), it seems like we should create an iterator of handles, where each handle's return type is a DeltaResult<BoxStream> that the ChunkIter can then await directly?

let streams = files.iter().cloned().map(|f| {
      ...
    runtime.spawn(async move {
        Ok(f.open(...)?.await?)
    })
});

and then, assuming the iterator wraps a peekable version of streams and also has a cloned arc of the task executor:

fn next(&mut self) -> Option<Self::Item> {
    self.task_executor.block_on(async {
        while let Some(stream) = self.streams.peek() {
            match stream {
                Ok(stream) => {
                    if let Some(result) = stream.next().await {
                        return Some(result);
                    }
                    let _ = streams.pop(); // current stream is exhausted
                }
                Err(err) => {
                    let _ = streams.pop(); // current stream failed
                    return Some(Err(err))
                }
            };
        }
        None // all streams exhausted
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants