Skip to content

Commit

Permalink
Split api layer from file reader (#365)
Browse files Browse the repository at this point in the history
The update makes updates to the SourceRecordIterator to remove the
requirement for a S3Client and specific S3 knowledge from the iterator.

The iterator will now also call for more files after the initial set of
files has been processed.


The only remaining work to be done is to remove the construction of the
S3Object into an iterator from the SourceRecordIterator in a follow up
PR which will allow it to be completely re-useable.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
aindriu-aiven and dependabot[bot] authored Dec 6, 2024
1 parent d2b8e55 commit 076e424
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 190 deletions.
2 changes: 1 addition & 1 deletion gcs-sink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dependencies {
testImplementation(apache.kafka.connect.api)
testImplementation(apache.kafka.connect.runtime)
testImplementation(apache.kafka.connect.json)
testImplementation("com.google.cloud:google-cloud-nio:0.127.26")
testImplementation("com.google.cloud:google-cloud-nio:0.127.27")

testImplementation(compressionlibs.snappy)
testImplementation(compressionlibs.zstd.jni)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,9 @@ public int getS3RetryBackoffMaxRetries() {
public AWSCredentialsProvider getCustomCredentialsProvider() {
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
}

public int getFetchPageSize() {
return cfg.getInt(FETCH_PAGE_SIZE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
Expand All @@ -37,9 +36,8 @@

import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.FileReader;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
Expand Down Expand Up @@ -79,15 +77,12 @@ public class S3SourceTask extends SourceTask {

private Transformer transformer;

private String s3Bucket;

private boolean taskInitialized;

private final AtomicBoolean connectorStopped = new AtomicBoolean();
private final S3ClientFactory s3ClientFactory = new S3ClientFactory();

private final Object pollLock = new Object();
private FileReader fileReader;
private AWSV2SourceClient awsv2SourceClient;
private final Set<String> failedObjectKeys = new HashSet<>();
private final Set<String> inProcessObjectKeys = new HashSet<>();

Expand All @@ -108,11 +103,9 @@ public void start(final Map<String, String> props) {
LOGGER.info("S3 Source task started.");
s3SourceConfig = new S3SourceConfig(props);
initializeConverters();
initializeS3Client();
this.s3Bucket = s3SourceConfig.getString(AWS_S3_BUCKET_NAME_CONFIG);
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager(context, s3SourceConfig);
fileReader = new FileReader(s3SourceConfig, this.s3Bucket, failedObjectKeys);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
prepareReaderFromOffsetStorageReader();
this.taskInitialized = true;
}
Expand All @@ -132,14 +125,9 @@ private void initializeConverters() {
}
}

private void initializeS3Client() {
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
LOGGER.debug("S3 client initialized");
}

private void prepareReaderFromOffsetStorageReader() {
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, s3Client, this.s3Bucket, offsetManager,
this.transformer, fileReader);
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient);
}

@Override
Expand Down Expand Up @@ -187,7 +175,7 @@ private List<SourceRecord> extractSourceRecords(final List<SourceRecord> results
return results;
}
return RecordProcessor.processRecords(sourceRecordIterator, results, s3SourceConfig, keyConverter,
valueConverter, connectorStopped, this.transformer, fileReader, offsetManager);
valueConverter, connectorStopped, this.transformer, awsv2SourceClient, offsetManager);
}

private void waitForObjects() throws InterruptedException {
Expand All @@ -208,7 +196,7 @@ public void stop() {
}

private void closeResources() {
s3Client.shutdown();
awsv2SourceClient.shutdown();
}

// below for visibility in tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.s3.source.utils;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.codehaus.plexus.util.StringUtils;

/**
* Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls
* and authentication to AWS and returns useable objects to the SourceRecordIterator.
*/
public class AWSV2SourceClient {

public static final int PAGE_SIZE_FACTOR = 2;
private final S3SourceConfig s3SourceConfig;
private final AmazonS3 s3Client;
private final String bucketName;

private Predicate<S3ObjectSummary> filterPredicate = summary -> summary.getSize() > 0;
private final Set<String> failedObjectKeys;

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
final S3ClientFactory s3ClientFactory = new S3ClientFactory();
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
}

/**
* Valid for testing
*
* @param s3Client
* amazonS3Client
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
AWSV2SourceClient(final AmazonS3 s3Client, final S3SourceConfig s3SourceConfig,
final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR);

if (StringUtils.isNotBlank(startToken)) {
request.withStartAfter(startToken);
}

final Stream<String> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.withContinuationToken(response.getNextContinuationToken()));
} else {
return null;
}

})
.flatMap(response -> response.getObjectSummaries()
.stream()
.filter(filterPredicate)
.filter(objectSummary -> assignObjectToTask(objectSummary.getKey()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())))
.map(S3ObjectSummary::getKey);
return s3ObjectKeyStream.iterator();
}

public S3Object getObject(final String objectKey) {
return s3Client.getObject(bucketName, objectKey);
}

public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3ObjectSummary> predicate) {
filterPredicate = predicate;
}

private boolean assignObjectToTask(final String objectKey) {
final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
}

public void shutdown() {
s3Client.shutdown();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private RecordProcessor() {
public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> sourceRecordIterator,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final AtomicBoolean connectorStopped, final Transformer transformer, final FileReader fileReader,
final AtomicBoolean connectorStopped, final Transformer transformer, final AWSV2SourceClient sourceClient,
final OffsetManager offsetManager) {

final Map<String, String> conversionConfig = new HashMap<>();
Expand All @@ -55,7 +55,7 @@ public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> s
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, transformer, fileReader, offsetManager);
valueConverter, conversionConfig, transformer, sourceClient, offsetManager);
results.add(sourceRecord);
}
}
Expand All @@ -65,8 +65,8 @@ public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> s

static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final Map<String, String> conversionConfig, final Transformer transformer, final FileReader fileReader,
final OffsetManager offsetManager) {
final Map<String, String> conversionConfig, final Transformer transformer,
final AWSV2SourceClient sourceClient, final OffsetManager offsetManager) {

final String topic = s3SourceRecord.getTopic();
final Optional<SchemaAndValue> keyData = keyConverter.map(c -> c.toConnectData(topic, s3SourceRecord.key()));
Expand All @@ -80,7 +80,7 @@ static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, fina
return s3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
fileReader.addFailedObjectKeys(s3SourceRecord.getObjectKey());
sourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
throw e;
}
}
Expand Down
Loading

0 comments on commit 076e424

Please sign in to comment.