diff --git a/gcs-sink-connector/build.gradle.kts b/gcs-sink-connector/build.gradle.kts index 2c33f4c67..4af195ba7 100644 --- a/gcs-sink-connector/build.gradle.kts +++ b/gcs-sink-connector/build.gradle.kts @@ -98,7 +98,7 @@ dependencies { testImplementation(apache.kafka.connect.api) testImplementation(apache.kafka.connect.runtime) testImplementation(apache.kafka.connect.json) - testImplementation("com.google.cloud:google-cloud-nio:0.127.26") + testImplementation("com.google.cloud:google-cloud-nio:0.127.27") testImplementation(compressionlibs.snappy) testImplementation(compressionlibs.zstd.jni) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 8b38c8f4c..1e86638e1 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -483,4 +483,9 @@ public int getS3RetryBackoffMaxRetries() { public AWSCredentialsProvider getCustomCredentialsProvider() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); } + + public int getFetchPageSize() { + return cfg.getInt(FETCH_PAGE_SIZE); + } + } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 86a870bcd..be3d89618 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -17,7 +17,6 @@ package io.aiven.kafka.connect.s3.source; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS; -import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -37,9 +36,8 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; -import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import io.aiven.kafka.connect.s3.source.utils.FileReader; +import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; import io.aiven.kafka.connect.s3.source.utils.RecordProcessor; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; @@ -79,15 +77,12 @@ public class S3SourceTask extends SourceTask { private Transformer transformer; - private String s3Bucket; - private boolean taskInitialized; private final AtomicBoolean connectorStopped = new AtomicBoolean(); - private final S3ClientFactory s3ClientFactory = new S3ClientFactory(); private final Object pollLock = new Object(); - private FileReader fileReader; + private AWSV2SourceClient awsv2SourceClient; private final Set failedObjectKeys = new HashSet<>(); private final Set inProcessObjectKeys = new HashSet<>(); @@ -108,11 +103,9 @@ public void start(final Map props) { LOGGER.info("S3 Source task started."); s3SourceConfig = new S3SourceConfig(props); initializeConverters(); - initializeS3Client(); - this.s3Bucket = s3SourceConfig.getString(AWS_S3_BUCKET_NAME_CONFIG); this.transformer = TransformerFactory.getTransformer(s3SourceConfig); offsetManager = new OffsetManager(context, s3SourceConfig); - fileReader = new FileReader(s3SourceConfig, this.s3Bucket, failedObjectKeys); + awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); prepareReaderFromOffsetStorageReader(); this.taskInitialized = true; } @@ -132,14 +125,9 @@ private void initializeConverters() { } } - private void initializeS3Client() { - this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig); - LOGGER.debug("S3 client initialized"); - } - private void prepareReaderFromOffsetStorageReader() { - sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, s3Client, this.s3Bucket, offsetManager, - this.transformer, fileReader); + sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient); } @Override @@ -187,7 +175,7 @@ private List extractSourceRecords(final List results return results; } return RecordProcessor.processRecords(sourceRecordIterator, results, s3SourceConfig, keyConverter, - valueConverter, connectorStopped, this.transformer, fileReader, offsetManager); + valueConverter, connectorStopped, this.transformer, awsv2SourceClient, offsetManager); } private void waitForObjects() throws InterruptedException { @@ -208,7 +196,7 @@ public void stop() { } private void closeResources() { - s3Client.shutdown(); + awsv2SourceClient.shutdown(); } // below for visibility in tests diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java new file mode 100644 index 000000000..1689ec9fa --- /dev/null +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.codehaus.plexus.util.StringUtils; + +/** + * Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls + * and authentication to AWS and returns useable objects to the SourceRecordIterator. + */ +public class AWSV2SourceClient { + + public static final int PAGE_SIZE_FACTOR = 2; + private final S3SourceConfig s3SourceConfig; + private final AmazonS3 s3Client; + private final String bucketName; + + private Predicate filterPredicate = summary -> summary.getSize() > 0; + private final Set failedObjectKeys; + + /** + * @param s3SourceConfig + * configuration for Source connector + * @param failedObjectKeys + * all objectKeys which have already been tried but have been unable to process. + */ + public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { + this.s3SourceConfig = s3SourceConfig; + final S3ClientFactory s3ClientFactory = new S3ClientFactory(); + this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig); + this.bucketName = s3SourceConfig.getAwsS3BucketName(); + this.failedObjectKeys = new HashSet<>(failedObjectKeys); + } + + /** + * Valid for testing + * + * @param s3Client + * amazonS3Client + * @param s3SourceConfig + * configuration for Source connector + * @param failedObjectKeys + * all objectKeys which have already been tried but have been unable to process. + */ + AWSV2SourceClient(final AmazonS3 s3Client, final S3SourceConfig s3SourceConfig, + final Set failedObjectKeys) { + this.s3SourceConfig = s3SourceConfig; + this.s3Client = s3Client; + this.bucketName = s3SourceConfig.getAwsS3BucketName(); + this.failedObjectKeys = new HashSet<>(failedObjectKeys); + } + + public Iterator getListOfObjectKeys(final String startToken) { + final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) + .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR); + + if (StringUtils.isNotBlank(startToken)) { + request.withStartAfter(startToken); + } + + final Stream s3ObjectKeyStream = Stream + .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { + // This is called every time next() is called on the iterator. + if (response.isTruncated()) { + return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName) + .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) + .withContinuationToken(response.getNextContinuationToken())); + } else { + return null; + } + + }) + .flatMap(response -> response.getObjectSummaries() + .stream() + .filter(filterPredicate) + .filter(objectSummary -> assignObjectToTask(objectSummary.getKey())) + .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))) + .map(S3ObjectSummary::getKey); + return s3ObjectKeyStream.iterator(); + } + + public S3Object getObject(final String objectKey) { + return s3Client.getObject(bucketName, objectKey); + } + + public void addFailedObjectKeys(final String objectKey) { + this.failedObjectKeys.add(objectKey); + } + + public void setFilterPredicate(final Predicate predicate) { + filterPredicate = predicate; + } + + private boolean assignObjectToTask(final String objectKey) { + final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); + final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; + final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); + return taskAssignment == taskId; + } + + public void shutdown() { + s3Client.shutdown(); + } + +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/FileReader.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/FileReader.java deleted file mode 100644 index d211133d7..000000000 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/FileReader.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.FETCH_PAGE_SIZE; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Stream; - -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.S3ObjectSummary; - -public class FileReader { - - public static final int PAGE_SIZE_FACTOR = 2; - private final S3SourceConfig s3SourceConfig; - private final String bucketName; - - private final Set failedObjectKeys; - - public FileReader(final S3SourceConfig s3SourceConfig, final String bucketName, - final Set failedObjectKeys) { - this.s3SourceConfig = s3SourceConfig; - this.bucketName = bucketName; - this.failedObjectKeys = new HashSet<>(failedObjectKeys); - } - - Iterator fetchObjectSummaries(final AmazonS3 s3Client) { - final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR); - - final Stream s3ObjectStream = Stream - .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { - if (response.isTruncated()) { - return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR) - .withContinuationToken(response.getNextContinuationToken())); - } else { - return null; - } - }) - .flatMap(response -> response.getObjectSummaries() - .stream() - .filter(objectSummary -> objectSummary.getSize() > 0) - .filter(objectSummary -> assignObjectToTask(objectSummary.getKey())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))); - return s3ObjectStream.iterator(); - } - - public void addFailedObjectKeys(final String objectKey) { - this.failedObjectKeys.add(objectKey); - } - - private boolean assignObjectToTask(final String objectKey) { - final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); - return taskAssignment == taskId; - } -} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java index f4386aefe..13104374b 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java @@ -45,7 +45,7 @@ private RecordProcessor() { public static List processRecords(final Iterator sourceRecordIterator, final List results, final S3SourceConfig s3SourceConfig, final Optional keyConverter, final Converter valueConverter, - final AtomicBoolean connectorStopped, final Transformer transformer, final FileReader fileReader, + final AtomicBoolean connectorStopped, final Transformer transformer, final AWSV2SourceClient sourceClient, final OffsetManager offsetManager) { final Map conversionConfig = new HashMap<>(); @@ -55,7 +55,7 @@ public static List processRecords(final Iterator s final S3SourceRecord s3SourceRecord = sourceRecordIterator.next(); if (s3SourceRecord != null) { final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, keyConverter, - valueConverter, conversionConfig, transformer, fileReader, offsetManager); + valueConverter, conversionConfig, transformer, sourceClient, offsetManager); results.add(sourceRecord); } } @@ -65,8 +65,8 @@ public static List processRecords(final Iterator s static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig, final Optional keyConverter, final Converter valueConverter, - final Map conversionConfig, final Transformer transformer, final FileReader fileReader, - final OffsetManager offsetManager) { + final Map conversionConfig, final Transformer transformer, + final AWSV2SourceClient sourceClient, final OffsetManager offsetManager) { final String topic = s3SourceRecord.getTopic(); final Optional keyData = keyConverter.map(c -> c.toConnectData(topic, s3SourceRecord.key())); @@ -80,7 +80,7 @@ static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, fina return s3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue); } catch (DataException e) { LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); - fileReader.addFailedObjectKeys(s3SourceRecord.getObjectKey()); + sourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); throw e; } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 5bb6bf2ff..43ca7a717 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -31,9 +31,7 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,40 +48,43 @@ public final class SourceRecordIterator implements Iterator { + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt private String currentObjectKey; - private final Iterator s3ObjectSummaryIterator; + private Iterator objectListIterator; private Iterator recordIterator = Collections.emptyIterator(); private final OffsetManager offsetManager; private final S3SourceConfig s3SourceConfig; private final String bucketName; - private final AmazonS3 s3Client; private final Transformer transformer; + // Once we decouple the S3Object from the Source Iterator we can change this to be the SourceApiClient + // At which point it will work for al our integrations. + private final AWSV2SourceClient sourceClient; // NOPMD - private final FileReader fileReader; // NOPMD - - public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final AmazonS3 s3Client, final String bucketName, - final OffsetManager offsetManager, final Transformer transformer, final FileReader fileReader) { + public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, + final Transformer transformer, final AWSV2SourceClient sourceClient) { this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; - this.s3Client = s3Client; - this.bucketName = bucketName; + + this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; - this.fileReader = fileReader; - s3ObjectSummaryIterator = fileReader.fetchObjectSummaries(s3Client); + this.sourceClient = sourceClient; + objectListIterator = sourceClient.getListOfObjectKeys(null); } private void nextS3Object() { - if (!s3ObjectSummaryIterator.hasNext()) { - recordIterator = Collections.emptyIterator(); - return; + if (!objectListIterator.hasNext()) { + // Start after the object Key we have just finished with. + objectListIterator = sourceClient.getListOfObjectKeys(currentObjectKey); + if (!objectListIterator.hasNext()) { + recordIterator = Collections.emptyIterator(); + return; + } } try { - final S3ObjectSummary file = s3ObjectSummaryIterator.next(); - if (file != null) { - currentObjectKey = file.getKey(); + currentObjectKey = objectListIterator.next(); + if (currentObjectKey != null) { recordIterator = createIteratorForCurrentFile(); } } catch (IOException e) { @@ -92,29 +93,30 @@ private void nextS3Object() { } private Iterator createIteratorForCurrentFile() throws IOException { - try (S3Object s3Object = s3Client.getObject(bucketName, currentObjectKey);) { - final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(currentObjectKey); - String topicName; - int defaultPartitionId; + final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(currentObjectKey); + String topicName; + int defaultPartitionId; + + if (fileMatcher.find()) { + // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic + try (S3Object s3Object = sourceClient.getObject(currentObjectKey);) { - if (fileMatcher.find()) { topicName = fileMatcher.group(PATTERN_TOPIC_KEY); defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - } else { - LOGGER.error("File naming doesn't match to any topic. {}", currentObjectKey); - s3Object.close(); - return Collections.emptyIterator(); - } - final long defaultStartOffsetId = 1L; + final long defaultStartOffsetId = 1L; - final String finalTopic = topicName; - final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, - bucketName); + final String finalTopic = topicName; + final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, + bucketName); - return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, - partitionMap); + return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, + partitionMap); + } + } else { + LOGGER.error("File naming doesn't match to any topic. {}", currentObjectKey); + return Collections.emptyIterator(); } } @@ -197,7 +199,7 @@ public S3SourceRecord next() { @Override public boolean hasNext() { - return recordIterator.hasNext() || s3ObjectSummaryIterator.hasNext(); + return recordIterator.hasNext() || objectListIterator.hasNext(); } @Override diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/FileReaderTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java similarity index 84% rename from s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/FileReaderTest.java rename to s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index 39496b8d2..5b5176690 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/FileReaderTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -39,18 +41,18 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -class FileReaderTest { +class AWSV2SourceClientTest { - private static final String TEST_BUCKET = "test-bucket"; private AmazonS3 s3Client; - private FileReader fileReader; + private AWSV2SourceClient awsv2SourceClient; private static Map getConfigMap(final int maxTasks, final int taskId) { final Map configMap = new HashMap<>(); configMap.put("tasks.max", String.valueOf(maxTasks)); configMap.put("task.id", String.valueOf(taskId)); - configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "testbucket"); + + configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket"); return configMap; } @@ -61,7 +63,7 @@ void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.emptyList(), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); - final Iterator summaries = fileReader.fetchObjectSummaries(s3Client); + final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); assertThat(summaries).isExhausted(); } @@ -71,9 +73,8 @@ void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, fi final String objectKey = "any-key"; initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectSummaryIterator(objectKey); + final Iterator summaries = getS3ObjectKeysIterator(objectKey); assertThat(summaries).hasNext(); - assertThat(summaries.next().getSize()).isEqualTo(1); } @ParameterizedTest @@ -81,9 +82,8 @@ void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, fi void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, final String objectKey) { initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectSummaryIterator(objectKey); + final Iterator summaries = getS3ObjectKeysIterator(objectKey); assertThat(summaries).hasNext(); - assertThat(summaries.next().getSize()).isEqualTo(1); } @ParameterizedTest @@ -92,7 +92,8 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final in void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, final String objectKey) { initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectSummaryIterator(objectKey); + final Iterator summaries = getS3ObjectKeysIterator(objectKey); + assertThat(summaries).isExhausted(); } @@ -103,11 +104,11 @@ void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int ta final ListObjectsV2Result listObjectsV2Result = getListObjectsV2Result(); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); - final Iterator summaries = fileReader.fetchObjectSummaries(s3Client); + final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); // assigned 1 object to taskid assertThat(summaries).hasNext(); - assertThat(summaries.next().getSize()).isEqualTo(1); + assertThat(summaries.next()).isNotBlank(); assertThat(summaries).isExhausted(); } @@ -124,9 +125,10 @@ void testFetchObjectSummariesWithPagination() throws IOException { when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); - final Iterator summaries = fileReader.fetchObjectSummaries(s3Client); - + final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); + verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); assertThat(summaries.next()).isNotNull(); + assertThat(summaries).isExhausted(); } private ListObjectsV2Result createListObjectsV2Result(final List summaries, @@ -145,20 +147,21 @@ private S3ObjectSummary createObjectSummary(final long sizeOfObject, final Strin return summary; } - private Iterator getS3ObjectSummaryIterator(final String objectKey) { + private Iterator getS3ObjectKeysIterator(final String objectKey) { final S3ObjectSummary objectSummary = createObjectSummary(1, objectKey); final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( Collections.singletonList(objectSummary), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); - return fileReader.fetchObjectSummaries(s3Client); + return awsv2SourceClient.getListOfObjectKeys(null); } public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { final Map configMap = getConfigMap(maxTasks, taskId); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - fileReader = new FileReader(s3SourceConfig, TEST_BUCKET, Collections.emptySet()); s3Client = mock(AmazonS3.class); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + } private ListObjectsV2Result getListObjectsV2Result() { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java index a9ee18917..d304bc59b 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java @@ -62,7 +62,7 @@ class RecordProcessorTest { private OffsetManager offsetManager; @Mock - private FileReader fileReader; + private AWSV2SourceClient sourceClient; private AtomicBoolean connectorStopped; private Iterator sourceRecordIterator; @@ -86,7 +86,7 @@ void testProcessRecordsNoRecords() { Optional.of(keyConverter), valueConverter, connectorStopped, - transformer, fileReader, offsetManager + transformer, sourceClient, offsetManager ); assertThat(processedRecords).as("Processed records should be empty when there are no records.").isEmpty(); @@ -108,7 +108,7 @@ void testProcessRecordsWithRecords() throws ConnectException { Optional.of(keyConverter), valueConverter, connectorStopped, - transformer, fileReader, offsetManager + transformer, sourceClient, offsetManager ); assertThat(results).hasSize(1); @@ -128,7 +128,7 @@ void testProcessRecordsConnectorStopped() { Optional.of(keyConverter), valueConverter, connectorStopped, - transformer, fileReader, offsetManager + transformer, sourceClient, offsetManager ); assertThat(processedRecords).as("Processed records should be empty when connector is stopped.").isEmpty(); @@ -147,7 +147,7 @@ void testCreateSourceRecords() { when(mockRecord.getSourceRecord(anyString(), any(), any())).thenReturn(mock(SourceRecord.class)); final SourceRecord sourceRecords = RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, - Optional.of(keyConverter), valueConverter, new HashMap<>(), transformer, fileReader, offsetManager); + Optional.of(keyConverter), valueConverter, new HashMap<>(), transformer, sourceClient, offsetManager); assertThat(sourceRecords).isNotNull(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 4630432ac..5e6d7928b 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -26,54 +26,42 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; -import java.util.List; import java.util.stream.Stream; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.model.S3ObjectSummary; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; final class SourceRecordIteratorTest { - private AmazonS3 mockS3Client; private S3SourceConfig mockConfig; private OffsetManager mockOffsetManager; private Transformer mockTransformer; - private FileReader mockFileReader; + private AWSV2SourceClient mockSourceApiClient; @BeforeEach public void setUp() { - mockS3Client = mock(AmazonS3.class); mockConfig = mock(S3SourceConfig.class); mockOffsetManager = mock(OffsetManager.class); mockTransformer = mock(Transformer.class); - mockFileReader = mock(FileReader.class); + mockSourceApiClient = mock(AWSV2SourceClient.class); } @Test void testIteratorProcessesS3Objects() throws Exception { - final S3ObjectSummary mockSummary = new S3ObjectSummary(); - mockSummary.setKey("topic-00001-abc123.txt"); - // Mock list of S3 object summaries - final List mockObjectSummaries = Collections.singletonList(mockSummary); - - final ListObjectsV2Result result = mockListObjectsResult(mockObjectSummaries); - when(mockS3Client.listObjectsV2(anyString())).thenReturn(result); + final String key = "topic-00001-abc123.txt"; // Mock S3Object and InputStream try (S3Object mockS3Object = mock(S3Object.class); S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), null);) { - when(mockS3Client.getObject(anyString(), anyString())).thenReturn(mockS3Object); + when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any())).thenReturn(Stream.of(new Object())); @@ -84,26 +72,20 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - when(mockFileReader.fetchObjectSummaries(any())).thenReturn(Collections.emptyIterator()); - SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket", - mockOffsetManager, mockTransformer, mockFileReader); + when(mockSourceApiClient.getListOfObjectKeys(any())).thenReturn(Collections.emptyIterator()); + SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient); assertThat(iterator.hasNext()).isFalse(); assertThat(iterator.next()).isNull(); - when(mockFileReader.fetchObjectSummaries(any())).thenReturn(mockObjectSummaries.listIterator()); + when(mockSourceApiClient.getListOfObjectKeys(any())) + .thenReturn(Collections.singletonList(key).listIterator()); - iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket", mockOffsetManager, - mockTransformer, mockFileReader); + iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); } } - - private ListObjectsV2Result mockListObjectsResult(final List summaries) { - final ListObjectsV2Result result = mock(ListObjectsV2Result.class); - when(result.getObjectSummaries()).thenReturn(summaries); - return result; - } }