-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Parallel Reading (CSV, JSON) / Help Wanted #8723
Comments
Thanks for the heads up @marvinlanhenke -- I'll check this out over the next day or two. Much appreciated cc @devinjdangelo and @tustvold if you are interested |
I didn't have a chance to review https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381 in fine detail, but in general I think that code is looking very hard to navagate / test as it is so deeply nested in futures / closures. My suggestion is to try and extract the logic somehow into a structure that is easier to test and reason about. For example, maybe you could create a struct like struct StreamingJsonReaderBuilder {
...
}
impl StreamingJsonReaderBuilder {
fn new(..) {}
fn with_range(mut self, range: ..) ...
fn build () -> SendableRecordBatchStream
} And the you could start writing tests like let object_store = ...;
let input = StreamingJsonReaderBuilder::new(object_store.read())
.with_range(Range::new(100, 200))
.build;
let batches = collect(input)
assert_batches_eq(..) That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like |
@alamb I will look into you suggestion and try to do some refactoring in order to understand what is and what should be happening at this point. thanks again. |
I think this was accidentally closed so I am reopening it. I am happy to close it again if I missed something |
If anyone wants a fun exercise, getting the CSV reader to read in parallel from local files owuld greatly speed up the h2o benchmarks |
Is your feature request related to a problem or challenge?
As originally stated in #6922 (I'm not sure why the issue was closed) and discussed in #6801 the current
FileOpener
implementation for both, Csv and Json, are utilizing multiple GetRequests to adjust the byte range prior to parsing / reading the file itself.This is suboptimal and can be improved - minimizing the latency due to multiple remote network requests.
Describe the solution you'd like
I would like to reduce the number of GetRequests from 3 to 1.
This can be done by "overfetching" the original partition byte range; and then adjust the range by finding the newline delimiter similar to the solution already implemented.
The approach is outlined here: #6801 (comment) by @alamb
There are some edge-cases that need consideration, like "heterogenous object sizes" within a CSV row or JSON object, that leads to partition ranges overlapping on the same line, which can lead to reading the same line twice. Error handling/ retry when no newline can be found ("overfetching" range was to small) has to be handled, as well.
POC:
I already went ahead and implemented a POC which works and can handle some edge-cases like overlapping partition ranges; appropriate error handling / retry is still missing.
However, I definitely need help to improve upon this: https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381
The solution is inefficient due to line-by-line operations and buffer cloning / copying.
I tried different ways to handle the
GetResultPayload::Stream
by using BytesMut::new() & buffer.extend_from_slice; but I was not able to handle all the edge-cases correctly.I'd greatly appreciate if someone can give some pointers; or take it from here to improve upon the POC.
Describe alternatives you've considered
Leave as is.
Additional context
None.
The text was updated successfully, but these errors were encountered: