-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Place offset manager in commons #373
base: s3-source-release
Are you sure you want to change the base?
Place offset manager in commons #373
Conversation
b5278e0
to
69ea274
Compare
Units tests pass, there is an issue with the integration tests not picking up the changes in commons. |
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
Outdated
Show resolved
Hide resolved
commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Outdated
Show resolved
Hide resolved
...-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java
Outdated
Show resolved
Hide resolved
if (objectListing.isTruncated()) { | ||
// get the next set of data and create an iterator on it. | ||
request.setStartAfter(null); | ||
request.withContinuationToken(objectListing.getContinuationToken()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure the continuation token is all that is required here, you can create a new request and only add the contiuation token (possibly also require the bucket though)
...-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few comments some are for future follow ups but we should create issues for them so we dont miss them.
throw new AmazonClientException(e); | ||
} | ||
this.s3ObjectIterator = IteratorUtils.filteredIterator(sourceClient.getIteratorOfObjects(null), | ||
s3Object -> extractOffsetManagerEntry(s3Object)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lambda can be replaced with method reference
s3Object -> extractOffsetManagerEntry(s3Object)); | |
this::extractOffsetManagerEntry); |
* the Abstract Config to use. | ||
* @return a Stream of SchemaAndValue objects. | ||
*/ | ||
public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is looking great, much simplified version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to find why no events are pushed to kafka offsets topic
@@ -119,6 +118,7 @@ public List<SourceRecord> poll() throws InterruptedException { | |||
|
|||
while (!connectorStopped.get()) { | |||
try { | |||
waitForObjects(); | |||
extractSourceRecords(results); | |||
LOGGER.info("Number of records extracted and sent: {}", results.size()); | |||
return results; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an extract of what is sent to kafka offsets topic, before this PR, and with this PR.
Before this PR :
SourceRecord{
sourcePartition={bucket=test-bucket0, topic=bytesTest, topicPartition=0},
sourceOffset={object_key_s3-source-connector-for-apache-kafka-test-2024-12-20T13:34:01.62052/bytesTest-00000-1734698057527.txt=1}
}
ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@6e96f788, keySchema=null, value=[B@49e57a97, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}
With this PR :
SourceRecord{
sourcePartition={partition=0, bucket=test-bucket0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, topic=bytesTest}, sourceOffset={bucket=test-bucket0, topic=bytesTest, partition=0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, recordCount=0}
}
ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@67e2252f, keySchema=null, value=[B@1d001ae2, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}
- There are some duplicate keys sent in sourcePartition, and sourceOffset, which should be removed.
- Have tested locally, and no events are pushed to connect-offset-topic- topic
Am not sure where the problem is, am going to debug further. May be something to do with the new structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition has been changed to contain only bucket and S3Object.key()
Offset has been changed to only contain the number of records produced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition should not contain any information related to objects and keys.
It should only contain partition ids.
I see sourcePartition now has bucket and objectKey. Move them to sourceOffset.
recordCount is part of sourceOffset, create a map for every object key and value to retrieve them
*/ | ||
@Override | ||
public OffsetManager.OffsetManagerKey getManagerKey() { | ||
return () -> Map.of(BUCKET, data.get(BUCKET), OBJECT_KEY, data.get(OBJECT_KEY)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of objectkey storing as keys, it is better to store partition ids in key.
We will have fewer number of keys.
Just verified lenses s3 source connector and adobe s3 source connector, and they store partitionids.
Can we think about this too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topic.partitions we have this config. Our earlier implementation was based on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gharris1727 your suggestion will be helpful here.
According to javadocs of OffsetStorageReader : offsets() method, I was thinking we would have to store topic and partition id in offset storage keys atleast ?
@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
return () -> Map.of(BUCKET, data.get(BUCKET), TOPIC, TOPIC, PARTITION, PARTITION);
}
When we have several objects under specified topics and partitions and to retrieve the stored offset map, how can be better structure the keys ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to look at a couple of things.
- When pulling the data from kafka we only need the file location (bucket and S3Object key). All other items are currently extracted from the key. So the bucket and key uniquely identify the object in S3.
- Adding more elements to the key means that we need to extract those items before we can look up the data in the offset manger.
Finally, the implementation for S3 is specific to the S3 source and does not impact the commons OffsetManger implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect. We need to have only partition information in sourcePartition and sourceOffset should contain the object keys and record counts etc.
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> { | ||
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y); | ||
}); | ||
// FIXME after KAFKA-14947 is fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is already working in feature branch. Not sure if it's totally related
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been mostly fixed. There are edge cases where KAFKA-14947 applies.
0eaf29a
to
2637f64
Compare
* @return the entry. | ||
*/ | ||
public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<String, Object>, E> creator) { | ||
LOGGER.info("getEntry: {}", key.getPartitionMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably should be debug for the amount of times we could be accessing this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually all these infos could be debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -262,7 +265,7 @@ static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> c | |||
for (final ConsumerRecord<byte[], byte[]> record : records) { | |||
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD | |||
}); | |||
messages.putAll(offsetRec); | |||
messages.put((String) offsetRec.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record value has changed, this now bring back the record Count only and not any details of the key.
to get the key we need to change this for loop to.
for (final ConsumerRecord<byte[], byte[]> record : records) {
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
List<Object> key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD
});
//key.get(0) is always the connector name that could be added as a check here if we wanted.
Map<String,Object> keyDetails = (Map<String,Object>)key.get(1);
messages.put((String) keyDetails.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively go to S3OffsetManagerEntry and alter getProperties() as below to put the objectKey back into the value.
@Override
public Map<String, Object> getProperties() {
final Map<String, Object> result = new HashMap<>(data);
result.put(RECORD_COUNT, recordCount);
result.put(OBJECT_KEY, objectKey);
return result;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with the first option.
* the key for the entry to remove. | ||
*/ | ||
public void remove(final OffsetManagerKey key) { | ||
LOGGER.info("Removing: {}", key.getPartitionMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug here too please.
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IntegrationBase needs to be updated and I had a couple of small questions and NITs
messages.putAll(offsetRec); | ||
final List<Object> key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD | ||
}); | ||
final Map<String, Object> keyDetails = (Map<String, Object>) key.get(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: add comment about key.get(0) being the name of the connector the commit is from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way SourceRecord is populated is incorrect. sourcePartition and sourceOffset should have the right information.
sourceOffset with just recordCount field is not correct at all.
If sourcePartition contains partitionId, sourceOffset would contain all the keys and corresponding offset positions which makes the map simple and readable.
The proposed structure leads to 2 problems:
|
Either way the size of the map would increase. So that concern should be eliminated. And a process should be in place to remove all the processed objects and offset positions, which makes the map smaller. Here the problem I see is with storing object keys into sourcePartition only makes the map larger, and looking at the javadocs of SourceRecord for sourcePartition and sourceOffset, these maps are not compatible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Few minor comments.
* @param key | ||
* the key for the entry to remove. | ||
*/ | ||
public void remove(final OffsetManagerKey key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void remove(final OffsetManagerKey key) { | |
public void removeOffsetEntry(final OffsetManagerKey key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I could not find any dependencies for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and the one on line 133 should have the same name as they do the same thing just us a different argument to get the job done.
commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java
Outdated
Show resolved
Hide resolved
* @param sourceRecord | ||
* the SourceRecord that contains the key to be removed. | ||
*/ | ||
public void remove(final SourceRecord sourceRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void remove(final SourceRecord sourceRecord) { | |
public void removeEntry(final SourceRecord sourceRecord) { |
commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java
Outdated
Show resolved
Hide resolved
@@ -19,6 +19,9 @@ | |||
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; | |||
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; | |||
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delete this config TARGET_TOPIC_PARTITIONS from SourceConfigFragment and all its dependencies from tests and readme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was this option was to assign specific partitions to the task. I don't have visibility into how it is used. Opened KCON-100 to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a clean up PR for removing unused config, if we need this removed as part of it I can add to that
assertThat(result).isNotPresent(); | ||
} | ||
|
||
@SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this fails, can we create a ticket ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PMD thinks this is a test class but it has nothing annotated with @Test
so it fails with the error. The note is to figure out why and how to get around the problem.
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java
Outdated
Show resolved
Hide resolved
/** THe record count for the data map. Extracted here because it is used/updated frequently during processing */ | ||
private long recordCount; | ||
|
||
private final String bucket; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some places a mention of 'bucketName'. can we make it consistent, and same with topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AWS documentation calls the String bucket
.
Kafka documentations calls the String topic
Changes to code to align with those standards.
...connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java
Outdated
Show resolved
Hide resolved
…setManager.java Co-authored-by: Murali Basani <[email protected]>
…setManagerTest.java Co-authored-by: Murali Basani <[email protected]>
…urce/utils/S3OffsetManagerEntry.java Co-authored-by: Murali Basani <[email protected]>
…urce/utils/S3OffsetManagerEntryTest.java Co-authored-by: Murali Basani <[email protected]>
…setManager.java Co-authored-by: Murali Basani <[email protected]>
Fix for KCON-57
While this looks like a large change, there are multiple cases where files were migrated from s3-source-connector to common module. Those files are counted twice. This change also removes unused classes/files.
Significant changes are in OffsetManager, S3SourceTask, S3SourceRecord and AWSV2SourceClient.
Made OffsetManager generic to handle multiple OffsetManagerRecord types while simplifying access from sources.
Source should implement an instance of OffsetManager.OffsetManagerEntry that tracks the specific data for the source.
OffsetManagerEntry is included in the Source specific record (e.g. S3SourceRecord), is updated as processing continues, and is the source of record for many of the S3 and Kafka specific values (e.g. partition, topic, S3Object key) as well as some dynamic data such as the current record number.
Transformer was modified to update the OffsetManagerEntry as records are returned.
Due to bug in Kafka this implementation can not guarantee write once functionality. https://issues.apache.org/jira/browse/KAFKA-14947
Added javadoc.