-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initiate S3 Multi-part upload on receiving first event #318
base: main
Are you sure you want to change the base?
Initiate S3 Multi-part upload on receiving first event #318
Conversation
b6cebcc
to
4776d6d
Compare
|
||
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); | ||
|
||
assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an FYI, the S3MockApi does not create the file names correctly for key, value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, the S3OutputStream has had multipart upload for a long time: Aiven-Open/s3-connector-for-apache-kafka#73
But we were still buffering data as records, rather than offloading them early? Crazy. Thanks for the improvement.
* This determines if the file is key based, and possible to change a single file multiple times per flush or if | ||
* it's a roll over file which at each flush is reset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain more about this? What is key based grouping, and why does it mutate the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @gharris1727 first of all thanks for taking a look!
In terms of the roll over and key grouping.
We have documentation on the key grouping here. (but I will explain below what I am doing.)
Docs
The S3 Sink (along with all the Sinks provided by Aiven) use a "Record Grouper" this record grouper uses the file.name.template to determine if records should be grouped in a 'changelog' or if they should be grouped by 'Key'.
e.g. {{topic}}-{{partition}}-{{start_offset}} is the default and would cause the Record Grouper to group by changelog.
Changelog means records are appended to the same file and on flush, this causes the record files to be rolled over, to use a new end start_offset
So the original file might be 'logs-0-28.ext' and after flush it will be 'logs-0-45.ext' and each event between offset 28 and 44 will be written to the file for partition 0.
As we don't enforce a max number of events per file, or a max file size the flush (this would be new and is something I am looking at in a separate memory improvements PR) works as a delimiter of sorts to roll the files over.
in compact mode the key looks something like
'{{key}}' or '{{key}}-{{topic}}-{{partition}}' and when matching keys appear it will create a new record or if there is one already existing update the existing record.
To handle this, currently the record grouper removes any existing record and adds the latest record to the file. This then gets written on flush.
The multi part upload does not handle a change to the file so the options are to upload every time and immediately close it to complete the upload or wait until the flush and update the record once.
The latter option is I think, better in terms of API costs if it is possible this could update multiple times over a 30-60s period.
edit: The downside to this implementation for the compact/key based records, is that the upcoming PR to reduce memory useage will only have an impact for those users using 'changelog' as we can delete the records of those uploaded in a part by S3 multi upload.
Any questions please let me know.
0600cf7
to
f719afd
Compare
@@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() { | |||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |||
"io.confluent.kafka.serializers.KafkaAvroSerializer"); | |||
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl()); | |||
producerProps.put("linger.ms", 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
linger.ms was added to send all the test events in one batch, so that the flush method is not called in between small batches of kafka events being sent causing the integration tests to fail.
1673c0b
to
4a4a571
Compare
private int numberOfRecords; | ||
final private List<SinkRecord> sinkRecords; | ||
final private String filename; | ||
final private long recordCreationDate = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought is that the recordCreationDate could be used to roll over files without the use of flush() and have users specify a max age, this potentially could also work for something like max file size and the individual parts could be tracked here.
4a4a571
to
aec3081
Compare
s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java
Outdated
Show resolved
Hide resolved
s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please either fluch the writers during stop()
or add a comment explaining why it is not necessary.
aec3081
to
a8a1d5f
Compare
commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java
Outdated
Show resolved
Hide resolved
@@ -135,9 +139,20 @@ public void clear() { | |||
fileBuffers.clear(); | |||
} | |||
|
|||
@Override | |||
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) { | |||
final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you trying to protect from keys with null
values with getOrDefault
or is there any other reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember if I had any other specific reason for this other then readability, to be honest.
a8a1d5f
to
77893fc
Compare
|
||
import org.apache.kafka.connect.sink.SinkRecord; | ||
|
||
public class GroupedSinkRecord { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming seems to be ambiguous, is it a record or group? Maybe something like SinkRecordsBatch
or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AnatolyPopov done!
77893fc
to
b4e06fd
Compare
b4e06fd
to
1623b36
Compare
…, and closes the file on flush. Signed-off-by: Aindriu Lavelle <[email protected]>
…allowing changelog records to initiate multipart upload. Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
1623b36
to
2fc7311
Compare
@Override | ||
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) { | ||
try { | ||
recordGrouper.records().forEach(this::flushFile); | ||
recordGrouper.records().forEach(this::flushToS3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talking to @AnatolyPopov we want to do two things
- Add documentation in README for S3 bucket lifecycle to remove incomplete multipart upload.
- Test if upload is aborted due to malformed or IOException that all the records are retried when connector restarts/retries.
This update initiates the multipart upload as soon as a record begins, and closes the file on flush.
This PR