-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC][wip] improve json reading with concurrent requests #704
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have badly understood how this async stuff works, but I left some comments anyway :P
type JoinHandle<T>: Future<Output = Result<T, Self::JoinError>> + Send | ||
where | ||
T: Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't think traits were allowed to provide associated types like this in stable rust... did something stabilize recently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, does this work?
type JoinHandle<T>: Future<Output = Result<T, Self::JoinError>> + Send | |
where | |
T: Send; | |
type JoinHandle<T: Send>: Future<Output = Result<T, Self::JoinError>> + Send; |
@@ -116,7 +116,13 @@ impl FileStream { | |||
let (sender, receiver) = std::sync::mpsc::sync_channel(readahead); | |||
|
|||
let executor_for_block = task_executor.clone(); | |||
task_executor.spawn(async move { | |||
// Note: we throw away the JoinHandle since we don't need to wait for it's return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar nit
// Note: we throw away the JoinHandle since we don't need to wait for it's return | |
// Note: we throw away the JoinHandle since we don't need to wait for its return |
where | ||
F: Future<Output = ()> + Send + 'static, | ||
F: Future<Output = T> + Send + 'static, | ||
T: Send + 'static, | ||
{ | ||
self.send_future(Box::pin(task)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this would compile with that semicolon blocking the now-required return value?
self.rx | ||
.recv() | ||
.ok() | ||
.map(|r| r.map(|rb| Box::new(ArrowEngineData::new(rb)) as Box<dyn EngineData>)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checking -- the only error recv
ever returns is HUP, and so converting that to an iterator None result is the desired behavior?
self.rx | |
.recv() | |
.ok() | |
.map(|r| r.map(|rb| Box::new(ArrowEngineData::new(rb)) as Box<dyn EngineData>)) | |
let r = self.rx.recv().ok()?; // HUP -> iterator finished | |
r.map(|rb| Box::new(ArrowEngineData::new(rb)) as _) |
self.task_executor.block_on(async move { | ||
let mut handles = Vec::with_capacity(len); | ||
|
||
for file in files.into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for file in files.into_iter() { | |
for file in files { |
https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator
} | ||
|
||
// Wait for all tasks to complete. | ||
join_all(handles).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do all tasks need to complete before we can return the iterator?
(prob need to clone the file list and task executor arc before launching tasks tho -- else will have borrow lifetime issues)
// tokio::time::sleep(std::time::Duration::from_secs(10)).await; | ||
// println!("sleeping"); | ||
while let Some(result) = stream.next().await { | ||
let _ = tx_clone.send(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing how this preserves order as required by log replay? It looks like the various tasks all race to append their stream chunks to the queue?
|
||
let f = file_opener.clone(); | ||
// println!("file: {:?}", file); | ||
let handle = runtime.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this call causes the task to start running. Thus launching an open
call for each file. But does the open
make progress without somebody waiting on it, so that the first stream.next()
call gets triggered? I would hope so.
Meanwhile, given the batch ordering issues (see comment below), it seems like we should create an iterator of handles, where each handle's return type is a DeltaResult<BoxStream>
that the ChunkIter
can then await
directly?
let streams = files.iter().cloned().map(|f| {
...
runtime.spawn(async move {
Ok(f.open(...)?.await?)
})
});
and then, assuming the iterator wraps a peekable version of streams
and also has a cloned arc of the task executor:
fn next(&mut self) -> Option<Self::Item> {
self.task_executor.block_on(async {
while let Some(stream) = self.streams.peek() {
match stream {
Ok(stream) => {
if let Some(result) = stream.next().await {
return Some(result);
}
let _ = streams.pop(); // current stream is exhausted
}
Err(err) => {
let _ = streams.pop(); // current stream failed
return Some(Err(err))
}
};
}
None // all streams exhausted
}
}
related: #595
note this is out of order due to spawn - instead I think i'll just do futures combinators