Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initiate S3 Multi-part upload on receiving first event #318

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

aindriu-aiven
Copy link
Contributor

@aindriu-aiven aindriu-aiven commented Oct 24, 2024

This update initiates the multipart upload as soon as a record begins, and closes the file on flush.

This PR

  • Initiates a multi part upload on retrieval of the first event, thus allowing the sink to write quicker to S3.
  • Once a record has been added to the S3OutputStream for writing it is removed from the S3RecordGroup to release memory

@aindriu-aiven aindriu-aiven requested review from a team as code owners October 24, 2024 12:37
@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from b6cebcc to 4776d6d Compare October 24, 2024 13:16

assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));

assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an FYI, the S3MockApi does not create the file names correctly for key, value

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, the S3OutputStream has had multipart upload for a long time: Aiven-Open/s3-connector-for-apache-kafka#73

But we were still buffering data as records, rather than offloading them early? Crazy. Thanks for the improvement.

Comment on lines +99 to +101
* This determines if the file is key based, and possible to change a single file multiple times per flush or if
* it's a roll over file which at each flush is reset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more about this? What is key based grouping, and why does it mutate the file?

Copy link
Contributor Author

@aindriu-aiven aindriu-aiven Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @gharris1727 first of all thanks for taking a look!

In terms of the roll over and key grouping.

We have documentation on the key grouping here. (but I will explain below what I am doing.)
Docs

The S3 Sink (along with all the Sinks provided by Aiven) use a "Record Grouper" this record grouper uses the file.name.template to determine if records should be grouped in a 'changelog' or if they should be grouped by 'Key'.

e.g. {{topic}}-{{partition}}-{{start_offset}} is the default and would cause the Record Grouper to group by changelog.
Changelog means records are appended to the same file and on flush, this causes the record files to be rolled over, to use a new end start_offset

So the original file might be 'logs-0-28.ext' and after flush it will be 'logs-0-45.ext' and each event between offset 28 and 44 will be written to the file for partition 0.

As we don't enforce a max number of events per file, or a max file size the flush (this would be new and is something I am looking at in a separate memory improvements PR) works as a delimiter of sorts to roll the files over.

in compact mode the key looks something like
'{{key}}' or '{{key}}-{{topic}}-{{partition}}' and when matching keys appear it will create a new record or if there is one already existing update the existing record.
To handle this, currently the record grouper removes any existing record and adds the latest record to the file. This then gets written on flush.

The multi part upload does not handle a change to the file so the options are to upload every time and immediately close it to complete the upload or wait until the flush and update the record once.

The latter option is I think, better in terms of API costs if it is possible this could update multiple times over a 30-60s period.

edit: The downside to this implementation for the compact/key based records, is that the upcoming PR to reduce memory useage will only have an impact for those users using 'changelog' as we can delete the records of those uploaded in a part by S3 multi upload.

Any questions please let me know.

@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from 0600cf7 to f719afd Compare October 30, 2024 13:06
@@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linger.ms was added to send all the test events in one batch, so that the flush method is not called in between small batches of kafka events being sent causing the integration tests to fail.

@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from 1673c0b to 4a4a571 Compare October 30, 2024 14:45
private int numberOfRecords;
final private List<SinkRecord> sinkRecords;
final private String filename;
final private long recordCreationDate = System.currentTimeMillis();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is that the recordCreationDate could be used to roll over files without the use of flush() and have users specify a max age, this potentially could also work for something like max file size and the individual parts could be tracked here.

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either fluch the writers during stop() or add a comment explaining why it is not necessary.

@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from aec3081 to a8a1d5f Compare November 13, 2024 07:39
@@ -135,9 +139,20 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to protect from keys with null values with getOrDefault or is there any other reason for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember if I had any other specific reason for this other then readability, to be honest.

@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from a8a1d5f to 77893fc Compare November 15, 2024 09:51

import org.apache.kafka.connect.sink.SinkRecord;

public class GroupedSinkRecord {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming seems to be ambiguous, is it a record or group? Maybe something like SinkRecordsBatch or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AnatolyPopov done!

@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from 77893fc to b4e06fd Compare November 15, 2024 11:41
@aindriu-aiven aindriu-aiven force-pushed the aindriu-aiven/initiate-multi-part-upload branch from b4e06fd to 1623b36 Compare November 25, 2024 13:25
…, and closes the file on flush.

Signed-off-by: Aindriu Lavelle <[email protected]>
…allowing changelog records to initiate multipart upload.

Signed-off-by: Aindriu Lavelle <[email protected]>
@AnatolyPopov AnatolyPopov force-pushed the aindriu-aiven/initiate-multi-part-upload branch from 1623b36 to 2fc7311 Compare December 3, 2024 11:44
@Override
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
recordGrouper.records().forEach(this::flushFile);
recordGrouper.records().forEach(this::flushToS3);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talking to @AnatolyPopov we want to do two things

  1. Add documentation in README for S3 bucket lifecycle to remove incomplete multipart upload.
  2. Test if upload is aborted due to malformed or IOException that all the records are retried when connector restarts/retries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants