Skip to content

Commit

Permalink
Expose KCL pollingMaxRecords & pollingIdleTime options
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Sep 25, 2024
1 parent 06ca18a commit ab2f9b8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport

private boolean emptyRecordList;

private int pollingMaxRecords = PollingConfig.DEFAULT_MAX_RECORDS;

private long pollingIdleTime = 1500L;

public KclMessageDrivenChannelAdapter(String... streams) {
this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
}
Expand Down Expand Up @@ -369,6 +373,26 @@ public void setEmptyRecordList(boolean emptyRecordList) {
this.emptyRecordList = emptyRecordList;
}

/**
* The number of records to poll from Kinesis when using {@link PollingConfig}.
* @param pollingMaxRecords the number of records to poll from Kinesis.
* @since 3.0.8
* @see PollingConfig#maxRecords(int)
*/
public void setPollingMaxRecords(int pollingMaxRecords) {
this.pollingMaxRecords = pollingMaxRecords;
}

/**
* The idle timeout between polls when using {@link PollingConfig}.
* @param pollingIdleTime idle timeout between polls.
* @since 3.0.8
* @see PollingConfig#idleTimeBetweenReadsInMillis(long)
*/
public void setPollingIdleTime(long pollingIdleTime) {
this.pollingIdleTime = pollingIdleTime;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -425,7 +449,9 @@ protected void doStart() {
else {
retrievalSpecificConfig =
new PollingConfig(this.kinesisClient)
.streamName(singleStreamName);
.streamName(singleStreamName)
.maxRecords(this.pollingMaxRecords)
.idleTimeBetweenReadsInMillis(this.pollingIdleTime);
}

RetrievalConfig retrievalConfig = this.config.retrievalConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() {
assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L);
}

@Test
public void pollingMaxRecordsIsPropagated() {
Integer maxRecords =
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
"scheduler.retrievalConfig.retrievalSpecificConfig.maxRecords",
Integer.class);
assertThat(maxRecords).isEqualTo(99);
}

@Configuration
@EnableIntegration
public static class TestConfiguration {
Expand All @@ -184,6 +193,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L));
adapter.setBindSourceRecord(true);
adapter.setEmptyRecordList(true);
adapter.setPollingMaxRecords(99);
return adapter;
}

Expand Down

0 comments on commit ab2f9b8

Please sign in to comment.