-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): modify the logic of gzip compressed file source #20236
base: main
Are you sure you want to change the base?
Conversation
Tests will be added shortly. |
// directly from the last successfully read `offset`. | ||
// | ||
// Special handling is required for files compressed with gzip: | ||
// - Gzip files must always be read from the beginning; they cannot be read from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we cannot achieve exactly once if gzip enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think exactly once is important for source. Some ideas to make reading gzip files exactly once:
- Yield one large chunk per gzip file containing all contents after decompression. This ensures that no barrier can be placed in the middle of the file but will increase memory usage.
- Pause fetcher upstream while reading a gzip file. This also ensures that no barrier can be placed in the middle.
- Use offset and size after decompression to track gzip file fetch progress, like what we already did right now. Opendal reader always starts from the beginning and skip contents below the recorded offset.
Personally perfer 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also prefer 3 because it has the least impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved, had some offline discussion with @hzxa21 .
Please see the description of the PR for specific details. I modified some of the lister and fetcher logic. cc @tabVersion This part was designed by you, for awareness.
.expect("The fs_split should be in the state table."); | ||
let row = state_store_handler.get(split_id.clone()).await? | ||
.unwrap_or_else(|| { | ||
panic!("The fs_split (file_name) {:?} should be in the state table.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we happen to meet this error, better not to panic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think should panic, otherwise it lost data.
This pull request has been modified. If you want me to regenerate unit test for any of the files related, please find the file in "Files Changed" tab and add a comment |
.await?; | ||
// After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time, | ||
// other files can continue reading from the last read `start_offset`. | ||
let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to suffix check, should we also check compression_format
(See L152)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no need, but we can add a check: if compression_format is set, suffix must be .gz
or .gzip
.
meta: SourceMeta::Empty, | ||
}); | ||
if (object_name.ends_with(".gz") || object_name.ends_with(".gzip")) | ||
&& offset + n_read <= start_offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is incorrect because offset is assigned to start_offset
in L168. A clearer approach is to return the reader as well as the read offset when constructing the reader. For gzip reader, the initial read offset will be 0
and for others the, the initial read offset will be start_offset
src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
Outdated
Show resolved
Hide resolved
// The reader itself can sense whether it has reached the end. | ||
// Therefore, after a file is read completely, we yield one more chunk marked as EOF, with its offset set to `usize::MAX` to indicate that it is finished. | ||
// In fetch executor, if `offset = usize::MAX` is encountered, the corresponding file can be deleted from the state table. | ||
let eof_chunk = self.generate_eof_chunk()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not a good idea to generate eof chunk in ParquetParser
because it is not just used by file source, but also file scan TVF. How about generate the eof chunk here? I don't think the eof chunk cares about the file format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't make the change here because the hidden columns are inconsistent for parquet and other encodes, so it's not appropriate to yield a common "eof" chunk here:
- For parquet encode, the order of hidden column is (
_row_id
_rw_s3_v2_file
,_rw_s3_v2_offset
); - for other encode, the order of hidden column is (
_rw_s3_v2_file
,_row_id
,_rw_s3_v2_offset
).
Actually I haven't found out why the order of hidden columns is inconsistent for these two encodings.
But since ParquetParser
is also used for file scan TVF, how about judging the encode here and
- for parquet encode, construct the eof chunk here
- for other encode, still construct the eof chunk inside reader.
The above is one solution. A better way is to unify the order of hidden columns, but I am not sure how much work this will take, as this fix is quite urgent. What do you think? And if you know why the hidden columns are not in the same order, please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think twice, change the order of hidden column is break change, which is dangerous. So maybe "judging the encode here" is the only approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it as mentioned above, correct me if I'm wrong.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Previously the offset variable serves two main purposes:
Then, we realized that the current coupling between the lister and the fetcher/reader is a bit heavy: the fetcher relies on the size obtained from the lister to determine whether the file has been completely read and to perform deletions. This requires that the representations of offset and size must be consistent. We have encountered two bugs at this point:
In fact, the reader itself can fully sense whether it has finished reading, so the lister and reader can be completely independent. The lister is only responsible for informing the reader which files to read, while the reader is responsible for reading and informing the fetch executor whether it has finished reading.
Based on this, this PR makes the following changes:
usize::MAX
a. Its reader starts reading from the beginning each time.
b. When yielding, it only yields chunks that are larger than the offset in the previous state table to ensure exactly once.
This change is effective for all formats (Parquet, CSV, JSON, gzip). After this, we can stop recording the file size in the state table, and we can modify this in a future PR.
Checklist
Documentation
Release note