Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place offset manager in commons #373

Open
wants to merge 51 commits into
base: s3-source-release
Choose a base branch
from

Conversation

Claudenw
Copy link
Contributor

@Claudenw Claudenw commented Dec 16, 2024

Fix for KCON-57

While this looks like a large change, there are multiple cases where files were migrated from s3-source-connector to common module. Those files are counted twice. This change also removes unused classes/files.

Significant changes are in OffsetManager, S3SourceTask, S3SourceRecord and AWSV2SourceClient.

Made OffsetManager generic to handle multiple OffsetManagerRecord types while simplifying access from sources.

Source should implement an instance of OffsetManager.OffsetManagerEntry that tracks the specific data for the source.

OffsetManagerEntry is included in the Source specific record (e.g. S3SourceRecord), is updated as processing continues, and is the source of record for many of the S3 and Kafka specific values (e.g. partition, topic, S3Object key) as well as some dynamic data such as the current record number.

Transformer was modified to update the OffsetManagerEntry as records are returned.

Due to bug in Kafka this implementation can not guarantee write once functionality. https://issues.apache.org/jira/browse/KAFKA-14947

Added javadoc.

@Claudenw Claudenw force-pushed the KCON-57_place_OffsetManager_in_commons branch from b5278e0 to 69ea274 Compare December 17, 2024 15:15
@Claudenw Claudenw marked this pull request as ready for review December 19, 2024 08:32
@Claudenw Claudenw requested review from a team as code owners December 19, 2024 08:32
@Claudenw
Copy link
Contributor Author

Units tests pass, there is an issue with the integration tests not picking up the changes in commons.

if (objectListing.isTruncated()) {
// get the next set of data and create an iterator on it.
request.setStartAfter(null);
request.withContinuationToken(objectListing.getContinuationToken());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure the continuation token is all that is required here, you can create a new request and only add the contiuation token (possibly also require the bucket though)

Copy link
Contributor

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few comments some are for future follow ups but we should create issues for them so we dont miss them.

throw new AmazonClientException(e);
}
this.s3ObjectIterator = IteratorUtils.filteredIterator(sourceClient.getIteratorOfObjects(null),
s3Object -> extractOffsetManagerEntry(s3Object));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lambda can be replaced with method reference

Suggested change
s3Object -> extractOffsetManagerEntry(s3Object));
this::extractOffsetManagerEntry);

* the Abstract Config to use.
* @return a Stream of SchemaAndValue objects.
*/
public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is looking great, much simplified version

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to find why no events are pushed to kafka offsets topic

@@ -119,6 +118,7 @@ public List<SourceRecord> poll() throws InterruptedException {

while (!connectorStopped.get()) {
try {
waitForObjects();
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
Copy link
Contributor

@muralibasani muralibasani Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an extract of what is sent to kafka offsets topic, before this PR, and with this PR.

Before this PR :

SourceRecord{
	sourcePartition={bucket=test-bucket0, topic=bytesTest, topicPartition=0},
 	sourceOffset={object_key_s3-source-connector-for-apache-kafka-test-2024-12-20T13:34:01.62052/bytesTest-00000-1734698057527.txt=1}
 }
  ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@6e96f788, keySchema=null, value=[B@49e57a97, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}

With this PR :

SourceRecord{
	sourcePartition={partition=0, bucket=test-bucket0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, topic=bytesTest}, sourceOffset={bucket=test-bucket0, topic=bytesTest, partition=0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, recordCount=0}
	}
	 ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@67e2252f, keySchema=null, value=[B@1d001ae2, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}
  • There are some duplicate keys sent in sourcePartition, and sourceOffset, which should be removed.
  • Have tested locally, and no events are pushed to connect-offset-topic- topic

Am not sure where the problem is, am going to debug further. May be something to do with the new structure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition has been changed to contain only bucket and S3Object.key()
Offset has been changed to only contain the number of records produced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition should not contain any information related to objects and keys.

It should only contain partition ids.

I see sourcePartition now has bucket and objectKey. Move them to sourceOffset.
recordCount is part of sourceOffset, create a map for every object key and value to retrieve them

@Claudenw Claudenw mentioned this pull request Dec 20, 2024
*/
@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
return () -> Map.of(BUCKET, data.get(BUCKET), OBJECT_KEY, data.get(OBJECT_KEY));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of objectkey storing as keys, it is better to store partition ids in key.
We will have fewer number of keys.

Just verified lenses s3 source connector and adobe s3 source connector, and they store partitionids.

Can we think about this too ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic.partitions we have this config. Our earlier implementation was based on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 your suggestion will be helpful here.
According to javadocs of OffsetStorageReader : offsets() method, I was thinking we would have to store topic and partition id in offset storage keys atleast ?

@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
    return () -> Map.of(BUCKET, data.get(BUCKET), TOPIC, TOPIC, PARTITION, PARTITION);
}

When we have several objects under specified topics and partitions and to retrieve the stored offset map, how can be better structure the keys ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to look at a couple of things.

  1. When pulling the data from kafka we only need the file location (bucket and S3Object key). All other items are currently extracted from the key. So the bucket and key uniquely identify the object in S3.
  2. Adding more elements to the key means that we need to extract those items before we can look up the data in the offset manger.

Finally, the implementation for S3 is specific to the S3 source and does not impact the commons OffsetManger implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. We need to have only partition information in sourcePartition and sourceOffset should contain the object keys and record counts etc.

IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> {
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y);
});
// FIXME after KAFKA-14947 is fixed.
Copy link
Contributor

@muralibasani muralibasani Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is already working in feature branch. Not sure if it's totally related

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been mostly fixed. There are edge cases where KAFKA-14947 applies.

@Claudenw Claudenw force-pushed the KCON-57_place_OffsetManager_in_commons branch from 0eaf29a to 2637f64 Compare January 9, 2025 13:35
* @return the entry.
*/
public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<String, Object>, E> creator) {
LOGGER.info("getEntry: {}", key.getPartitionMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should be debug for the amount of times we could be accessing this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all these infos could be debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -262,7 +265,7 @@ static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> c
for (final ConsumerRecord<byte[], byte[]> record : records) {
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
messages.putAll(offsetRec);
messages.put((String) offsetRec.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT));
Copy link
Contributor

@aindriu-aiven aindriu-aiven Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record value has changed, this now bring back the record Count only and not any details of the key.
to get the key we need to change this for loop to.

for (final ConsumerRecord<byte[], byte[]> record : records) {

            Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
            });

            List<Object> key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD
            });
//key.get(0) is always the connector name that could be added as a check here if we wanted.
            Map<String,Object> keyDetails = (Map<String,Object>)key.get(1);

            messages.put((String) keyDetails.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT));
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively go to S3OffsetManagerEntry and alter getProperties() as below to put the objectKey back into the value.

 @Override
    public Map<String, Object> getProperties() {
        final Map<String, Object> result = new HashMap<>(data);
        result.put(RECORD_COUNT, recordCount);
        result.put(OBJECT_KEY, objectKey);
        return result;
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the first option.

* the key for the entry to remove.
*/
public void remove(final OffsetManagerKey key) {
LOGGER.info("Removing: {}", key.getPartitionMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug here too please.

Copy link
Contributor

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IntegrationBase needs to be updated and I had a couple of small questions and NITs

messages.putAll(offsetRec);
final List<Object> key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD
});
final Map<String, Object> keyDetails = (Map<String, Object>) key.get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: add comment about key.get(0) being the name of the connector the commit is from.

Copy link
Contributor

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thank you

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way SourceRecord is populated is incorrect. sourcePartition and sourceOffset should have the right information.
sourceOffset with just recordCount field is not correct at all.
If sourcePartition contains partitionId, sourceOffset would contain all the keys and corresponding offset positions which makes the map simple and readable.

@Claudenw
Copy link
Contributor Author

Claudenw commented Jan 13, 2025

The way SourceRecord is populated is incorrect. sourcePartition and sourceOffset should have the right information. sourceOffset with just recordCount field is not correct at all. If sourcePartition contains partitionId, sourceOffset would contain all the keys and corresponding offset positions which makes the map simple and readable.

The proposed structure leads to 2 problems:

  1. Since Kafka does the updates using the data from the SourceRecord, every SourceRecord will need to include all the keys and corresponding offset positions on every update. The size of this data would continue to increase without bound.

  2. We can not request data on a subset of records from Kafka We have to retrieve all the data on every request. The Kafka context object allows us to provide a list of keys (source Partitions) an retrieve information about those partitions. So we can still do bulk requests.

@muralibasani
Copy link
Contributor

The way SourceRecord is populated is incorrect. sourcePartition and sourceOffset should have the right information. sourceOffset with just recordCount field is not correct at all. If sourcePartition contains partitionId, sourceOffset would contain all the keys and corresponding offset positions which makes the map simple and readable.

The proposed structure leads to 2 problems:

  1. Since Kafka does the updates using the data from the SourceRecord, every SourceRecord will need to include all the keys and corresponding offset positions on every update. The size of this data would continue to increase without bound.
  2. We can not request data on a subset of records from Kafka We have to retrieve all the data on every request. The Kafka context object allows us to provide a list of keys (source Partitions) an retrieve information about those partitions. So we can still do bulk requests.

Either way the size of the map would increase. So that concern should be eliminated. And a process should be in place to remove all the processed objects and offset positions, which makes the map smaller.

Here the problem I see is with storing object keys into sourcePartition only makes the map larger, and looking at the javadocs of SourceRecord for sourcePartition and sourceOffset, these maps are not compatible.

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Few minor comments.

* @param key
* the key for the entry to remove.
*/
public void remove(final OffsetManagerKey key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void remove(final OffsetManagerKey key) {
public void removeOffsetEntry(final OffsetManagerKey key) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I could not find any dependencies for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and the one on line 133 should have the same name as they do the same thing just us a different argument to get the job done.

* @param sourceRecord
* the SourceRecord that contains the key to be removed.
*/
public void remove(final SourceRecord sourceRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void remove(final SourceRecord sourceRecord) {
public void removeEntry(final SourceRecord sourceRecord) {

@@ -19,6 +19,9 @@
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delete this config TARGET_TOPIC_PARTITIONS from SourceConfigFragment and all its dependencies from tests and readme.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was this option was to assign specific partitions to the task. I don't have visibility into how it is used. Opened KCON-100 to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a clean up PR for removing unused config, if we need this removed as part of it I can add to that

assertThat(result).isNotPresent();
}

@SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails, can we create a ticket ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PMD thinks this is a test class but it has nothing annotated with @Test so it fails with the error. The note is to figure out why and how to get around the problem.

/** THe record count for the data map. Extracted here because it is used/updated frequently during processing */
private long recordCount;

private final String bucket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some places a mention of 'bucketName'. can we make it consistent, and same with topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS documentation calls the String bucket.
Kafka documentations calls the String topic

Changes to code to align with those standards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants