Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): modify the logic of gzip compressed file source #20236

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wcy-fdu
Copy link
Contributor

@wcy-fdu wcy-fdu commented Jan 21, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Previously the offset variable serves two main purposes:

  1. It is used to check whether a file has been completely read. In the fetch executor, when offset >= split.size, it indicates that the end of the file has been reached, prompting the removal of the corresponding entry from the state table.
  2. After recovery occurs, offset prevents the reading process from starting over from the beginning of the file. Instead, it allows the reading to continue directly from the last successfully read offset.

Then, we realized that the current coupling between the lister and the fetcher/reader is a bit heavy: the fetcher relies on the size obtained from the lister to determine whether the file has been completely read and to perform deletions. This requires that the representations of offset and size must be consistent. We have encountered two bugs at this point:

  • The offset for Parquet should be based on the number of rows rather than the size.
  • The offset and size for gzip-compressed files are not equal.

In fact, the reader itself can fully sense whether it has finished reading, so the lister and reader can be completely independent. The lister is only responsible for informing the reader which files to read, while the reader is responsible for reading and informing the fetch executor whether it has finished reading.

Based on this, this PR makes the following changes:

  1. After the reader has finished reading, it yields one additional chunk, solely to indicate EOF, by setting the offset of this chunk to usize::MAX
  2. The logic for the fetch executor to determine whether a file has been completely read and to delete it is changed to delete upon encountering the EOF chunk, and this special chunk is not sent downstream.
  3. For gzip files, to ensure exactly once:
    a. Its reader starts reading from the beginning each time.
    b. When yielding, it only yields chunks that are larger than the offset in the previous state table to ensure exactly once.

This change is effective for all formats (Parquet, CSV, JSON, gzip). After this, we can stop recording the file size in the state table, and we can modify this in a future PR.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

@github-actions github-actions bot added the type/fix Bug fix label Jan 21, 2025
@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Jan 21, 2025

Tests will be added shortly.

// directly from the last successfully read `offset`.
//
// Special handling is required for files compressed with gzip:
// - Gzip files must always be read from the beginning; they cannot be read from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we cannot achieve exactly once if gzip enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think exactly once is important for source. Some ideas to make reading gzip files exactly once:

  1. Yield one large chunk per gzip file containing all contents after decompression. This ensures that no barrier can be placed in the middle of the file but will increase memory usage.
  2. Pause fetcher upstream while reading a gzip file. This also ensures that no barrier can be placed in the middle.
  3. Use offset and size after decompression to track gzip file fetch progress, like what we already did right now. Opendal reader always starts from the beginning and skip contents below the recorded offset.

Personally perfer 3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer 3 because it has the least impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved, had some offline discussion with @hzxa21 .
Please see the description of the PR for specific details. I modified some of the lister and fetcher logic. cc @tabVersion This part was designed by you, for awareness.

.expect("The fs_split should be in the state table.");
let row = state_store_handler.get(split_id.clone()).await?
.unwrap_or_else(|| {
panic!("The fs_split (file_name) {:?} should be in the state table.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we happen to meet this error, better not to panic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should panic, otherwise it lost data.

kwannoel

This comment was marked as duplicate.

Copy link

gru-agent bot commented Jan 22, 2025

This pull request has been modified. If you want me to regenerate unit test for any of the files related, please find the file in "Files Changed" tab and add a comment @gru-agent. (The github "Comment on this file" feature is in the upper right corner of each file in "Files Changed" tab.)

@wcy-fdu wcy-fdu changed the title fix(source): modify the offset assignment logic of gzip compressed files fix(source): modify the logic of gzip compressed file source Jan 22, 2025
@wcy-fdu wcy-fdu requested a review from stdrc January 22, 2025 10:58
.await?;
// After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time,
// other files can continue reading from the last read `start_offset`.
let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to suffix check, should we also check compression_format (See L152)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no need, but we can add a check: if compression_format is set, suffix must be .gz or .gzip.

meta: SourceMeta::Empty,
});
if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
&& offset + n_read <= start_offset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is incorrect because offset is assigned to start_offset in L168. A clearer approach is to return the reader as well as the read offset when constructing the reader. For gzip reader, the initial read offset will be 0 and for others the, the initial read offset will be start_offset

// The reader itself can sense whether it has reached the end.
// Therefore, after a file is read completely, we yield one more chunk marked as EOF, with its offset set to `usize::MAX` to indicate that it is finished.
// In fetch executor, if `offset = usize::MAX` is encountered, the corresponding file can be deleted from the state table.
let eof_chunk = self.generate_eof_chunk()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a good idea to generate eof chunk in ParquetParser because it is not just used by file source, but also file scan TVF. How about generate the eof chunk here? I don't think the eof chunk cares about the file format.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't make the change here because the hidden columns are inconsistent for parquet and other encodes, so it's not appropriate to yield a common "eof" chunk here:

  • For parquet encode, the order of hidden column is (_row_id _rw_s3_v2_file, _rw_s3_v2_offset);
  • for other encode, the order of hidden column is ( _rw_s3_v2_file, _row_id,_rw_s3_v2_offset).

Actually I haven't found out why the order of hidden columns is inconsistent for these two encodings.

But since ParquetParser is also used for file scan TVF, how about judging the encode here and

  • for parquet encode, construct the eof chunk here
  • for other encode, still construct the eof chunk inside reader.

The above is one solution. A better way is to unify the order of hidden columns, but I am not sure how much work this will take, as this fix is ​​quite urgent. What do you think? And if you know why the hidden columns are not in the same order, please let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think twice, change the order of hidden column is break change, which is dangerous. So maybe "judging the encode here" is the only approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it as mentioned above, correct me if I'm wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants