Skip to content

Commit

Permalink
GH-249 Add properties for handling backpressure in KplMessageHandler
Browse files Browse the repository at this point in the history
Fixes: #249

* Simplification of back-pressure handling, introduced `KplBackpressureException`.
* Javadoc corrections.
* Code Review comments addressed. Added Test cases with RetryAdvice.
* Added Javadoc for `KplBackpressureException` in the `KplMessageHandler` class level.
* Javadoc related code review actions fixed.
* Updated Copyright and Javadoc related comments.
* Revert in `AbstractAwsMessageHandler`.
  • Loading branch information
siddharthjain210 authored and artembilan committed Jan 2, 2025
1 parent 510fe5b commit b5f6c39
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.KplBackpressureException;
import org.springframework.integration.aws.support.UserRecordResponse;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
Expand All @@ -63,11 +64,15 @@
import org.springframework.util.StringUtils;

/**
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer
* Library {@code putRecord(s)}.
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}.
* <p>
* The {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity.
* This exception can be handled with {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}.
* </p>
*
* @author Arnaud Lecollaire
* @author Artem Bilan
* @author Siddharth Jain
*
* @since 2.2
*
Expand Down Expand Up @@ -99,6 +104,8 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement

private volatile ScheduledFuture<?> flushFuture;

private long backPressureThreshold = 0;

public KplMessageHandler(KinesisProducer kinesisProducer) {
Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
this.kinesisProducer = kinesisProducer;
Expand All @@ -115,6 +122,19 @@ public void setConverter(Converter<Object, byte[]> converter) {
setMessageConverter(new ConvertingFromMessageConverter(converter));
}

/**
* Configure maximum records in flight for handling backpressure.
* By default, backpressure handling is not enabled.
* When backpressure handling is enabled and number of records in flight exceeds the threshold, a
* {@link KplBackpressureException} would be thrown.
* @param backPressureThreshold a value greater than {@code 0} to enable backpressure handling.
* @since 3.0.9
*/
public void setBackPressureThreshold(long backPressureThreshold) {
Assert.isTrue(backPressureThreshold >= 0, "'backPressureThreshold must be greater than or equal to 0.");
this.backPressureThreshold = backPressureThreshold;
}

/**
* Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record.
* @param messageConverter the {@link MessageConverter} to use.
Expand Down Expand Up @@ -368,6 +388,14 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
}

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
if (this.backPressureThreshold > 0) {
var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount();
if (numberOfRecordsInFlight > this.backPressureThreshold) {
throw new KplBackpressureException("Cannot send record to Kinesis since buffer is at max capacity.",
userRecord);
}
}

ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
return listenableFutureToCompletableFuture(recordResult)
.thenApply(UserRecordResponse::new);
Expand Down Expand Up @@ -403,7 +431,8 @@ private PutRecordRequest buildPutRecordRequest(Message<?> message) {
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. "
Assert.state(partitionKey != null,
"'partitionKey' must not be null for sending a Kinesis record."
+ "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " +
"or supply an 'aws_partitionKey' message header.");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aws.support;

import java.io.Serial;

import com.amazonaws.services.kinesis.producer.UserRecord;

/**
* An exception triggered from the {@link org.springframework.integration.aws.outbound.KplMessageHandler}
* while sending records to Kinesis when maximum number of records in flight exceeds the backpressure threshold.
*
* @author Siddharth Jain
* @author Artem Bilan
*
* @since 3.0.9
*/
public class KplBackpressureException extends RuntimeException {

@Serial
private static final long serialVersionUID = 1L;

private final UserRecord userRecord;

public KplBackpressureException(String message, UserRecord userRecord) {
super(message);
this.userRecord = userRecord;
}

/**
* Get the {@link UserRecord} when this exception has been thrown.
* @return the {@link UserRecord} when this exception has been thrown.
*/
public UserRecord getUserRecord() {
return this.userRecord;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aws.outbound;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.KplBackpressureException;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

/** The class contains test cases for KplMessageHandler.
*
* @author Siddharth Jain
*
* @since 3.0.9
*/
@SpringJUnitConfig
@DirtiesContext
public class KplMessageHandlerTests {

@Autowired
protected KinesisProducer kinesisProducer;

@Autowired
protected MessageChannel kinesisSendChannel;

@Autowired
protected KplMessageHandler kplMessageHandler;

@Test
@SuppressWarnings("unchecked")
void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
final Message<?> message = MessageBuilder
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("someHeaderKey", "someHeaderValue")
.build();

ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);
this.kplMessageHandler.setBackPressureThreshold(0);
this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount();
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
this.kplMessageHandler.setBackPressureThreshold(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(1);
final Message<?> message = MessageBuilder
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("someHeaderKey", "someHeaderValue")
.build();

ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer).getOutstandingRecordsCount();
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
this.kplMessageHandler.setBackPressureThreshold(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(5);
final Message<?> message = MessageBuilder
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("someHeaderKey", "someHeaderValue")
.build();

assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> this.kinesisSendChannel.send(message))
.withCauseInstanceOf(MessageHandlingException.class)
.withRootCauseExactlyInstanceOf(KplBackpressureException.class)
.withStackTraceContaining("Cannot send record to Kinesis since buffer is at max capacity.");

verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class));
verify(this.kinesisProducer).getOutstandingRecordsCount();
}

@AfterEach
public void tearDown() {
clearInvocations(this.kinesisProducer);
}

@Configuration
@EnableIntegration
public static class ContextConfiguration {

@Bean
public KinesisProducer kinesisProducer() {
return mock();
}

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder()
.retryOn(KplBackpressureException.class)
.exponentialBackoff(100, 2.0, 1000)
.maxAttempts(3)
.build());
return requestHandlerRetryAdvice;
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice")
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
kplMessageHandler.setAsync(true);
kplMessageHandler.setStream("someStream");
return kplMessageHandler;
}

}

}

0 comments on commit b5f6c39

Please sign in to comment.