Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Parallel Reading (CSV, JSON) / Help Wanted #8723

Open
marvinlanhenke opened this issue Jan 2, 2024 · 6 comments
Open

Improve Parallel Reading (CSV, JSON) / Help Wanted #8723

marvinlanhenke opened this issue Jan 2, 2024 · 6 comments
Labels
enhancement New feature or request

Comments

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented Jan 2, 2024

Is your feature request related to a problem or challenge?

As originally stated in #6922 (I'm not sure why the issue was closed) and discussed in #6801 the current FileOpener implementation for both, Csv and Json, are utilizing multiple GetRequests to adjust the byte range prior to parsing / reading the file itself.

This is suboptimal and can be improved - minimizing the latency due to multiple remote network requests.

Describe the solution you'd like

I would like to reduce the number of GetRequests from 3 to 1.

This can be done by "overfetching" the original partition byte range; and then adjust the range by finding the newline delimiter similar to the solution already implemented.

The approach is outlined here: #6801 (comment) by @alamb

There are some edge-cases that need consideration, like "heterogenous object sizes" within a CSV row or JSON object, that leads to partition ranges overlapping on the same line, which can lead to reading the same line twice. Error handling/ retry when no newline can be found ("overfetching" range was to small) has to be handled, as well.

POC:

I already went ahead and implemented a POC which works and can handle some edge-cases like overlapping partition ranges; appropriate error handling / retry is still missing.

However, I definitely need help to improve upon this: https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381

The solution is inefficient due to line-by-line operations and buffer cloning / copying.
I tried different ways to handle the GetResultPayload::Stream by using BytesMut::new() & buffer.extend_from_slice; but I was not able to handle all the edge-cases correctly.

I'd greatly appreciate if someone can give some pointers; or take it from here to improve upon the POC.

Describe alternatives you've considered

Leave as is.

Additional context

None.

@marvinlanhenke marvinlanhenke added the enhancement New feature or request label Jan 2, 2024
@marvinlanhenke
Copy link
Contributor Author

cc @alamb : as discussed in #6801

...happy if you have any pointers on how to improve the POC.

@marvinlanhenke marvinlanhenke changed the title Improve Parallel Reading (CSV, JSON) Improve Parallel Reading (CSV, JSON) / Help Wanted Jan 2, 2024
@alamb
Copy link
Contributor

alamb commented Jan 3, 2024

Thanks for the heads up @marvinlanhenke -- I'll check this out over the next day or two. Much appreciated

cc @devinjdangelo and @tustvold if you are interested

@alamb
Copy link
Contributor

alamb commented Jan 5, 2024

I didn't have a chance to review https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381 in fine detail, but in general I think that code is looking very hard to navagate / test as it is so deeply nested in futures / closures.

My suggestion is to try and extract the logic somehow into a structure that is easier to test and reason about.

For example, maybe you could create a struct like

struct StreamingJsonReaderBuilder {
 ...
}

impl StreamingJsonReaderBuilder {
  fn new(..) {}
  fn with_range(mut self, range: ..) ...
  fn build () -> SendableRecordBatchStream
}

And the you could start writing tests like

let object_store = ...;
let input = StreamingJsonReaderBuilder::new(object_store.read())
  .with_range(Range::new(100, 200))
  .build;

let batches = collect(input)
assert_batches_eq(..)

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

@marvinlanhenke
Copy link
Contributor Author

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

@alamb
...thanks, this might already help; guiding me on the right track - I was trying not "to put to much effort into the POC"; however this might have been the wrong decision since I kinda hit a roadbloack due to not being able to effectively reason about the code.

I will look into you suggestion and try to do some refactoring in order to understand what is and what should be happening at this point.

thanks again.

@alamb alamb reopened this Jan 8, 2024
@alamb
Copy link
Contributor

alamb commented Jan 8, 2024

I think this was accidentally closed so I am reopening it. I am happy to close it again if I missed something

@alamb
Copy link
Contributor

alamb commented Jan 29, 2025

If anyone wants a fun exercise, getting the CSV reader to read in parallel from local files owuld greatly speed up the h2o benchmarks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants