From f4e1fc682ab91870050f698b1a47853128272af4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EB=AF=BC=EA=B7=9C?= Date: Fri, 15 Mar 2024 17:34:08 +0900 Subject: [PATCH 1/5] GH-241: Support changing metrics-level for KclMessageDrivenChannelAdapter --- .../KclMessageDrivenChannelAdapter.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index a1479cdf..f7bbff9c 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,8 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; +import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; @@ -92,6 +94,7 @@ * @author Artem Bilan * @author Dirk Bonhomme * @author Siddharth Jain + * @author Minkyu Moon * * @since 2.2.0 */ @@ -145,6 +148,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport private volatile Scheduler scheduler; + private MetricsLevel metricsLevel = MetricsLevel.DETAILED; + public KclMessageDrivenChannelAdapter(String... streams) { this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams); } @@ -267,6 +272,16 @@ public void setFanOut(boolean fanOut) { this.fanOut = fanOut; } + /** + * Specify a metrics level to emit. + * Defaults to {@link MetricsLevel#DETAILED}. + * @param metricsLevel the {@link MetricsLevel} for emitting (or not) metrics into Cloud Watch. + */ + public void setMetricsLevel(MetricsLevel metricsLevel) { + Assert.notNull(metricsLevel, "'metricsLevel' must not be null"); + this.metricsLevel = metricsLevel; + } + @Override protected void onInit() { super.onInit(); @@ -321,13 +336,16 @@ protected void doStart() { .glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer) .retrievalSpecificConfig(retrievalSpecificConfig); + MetricsConfig metricsConfig = this.config.metricsConfig(); + metricsConfig.metricsLevel(this.metricsLevel); + this.scheduler = new Scheduler( this.config.checkpointConfig(), this.config.coordinatorConfig(), this.config.leaseManagementConfig(), lifecycleConfig, - this.config.metricsConfig(), + metricsConfig, this.config.processorConfig(), retrievalConfig); From a78fc6c9f71e8aae70ba808f4a3cb3f38e526894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EB=AF=BC=EA=B7=9C?= Date: Sat, 16 Mar 2024 00:13:29 +0900 Subject: [PATCH 2/5] Add test that metricsLevel is set correctly --- .../KclMessageDrivenChannelAdapterTests.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 65f498c7..3e1064bd 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.integration.test.util.TestUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -42,6 +43,7 @@ import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import software.amazon.kinesis.metrics.MetricsLevel; import static org.assertj.core.api.Assertions.assertThat; @@ -66,6 +68,9 @@ public class KclMessageDrivenChannelAdapterTests implements LocalstackContainerT @Autowired private PollableChannel kinesisReceiveChannel; + @Autowired + private KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter; + @BeforeAll static void setup() { AMAZON_KINESIS = LocalstackContainerTest.kinesisClient(); @@ -116,6 +121,21 @@ void kclChannelAdapterReceivesRecords() { assertThat(streamConsumers).hasSize(0); } + @Test + public void metricsLevelOfMetricsFactoryShouldBeSetToMetricsLevelOfAdapter() { + MetricsLevel metricsLevel = TestUtils.getPropertyValue( + kclMessageDrivenChannelAdapter, + "scheduler.metricsFactory.metricsLevel", + MetricsLevel.class + ); + MetricsLevel expectedMetricsLevel = TestUtils.getPropertyValue( + kclMessageDrivenChannelAdapter, + "metricsLevel", + MetricsLevel.class + ); + assertThat(metricsLevel).isEqualTo(expectedMetricsLevel); + } + @Configuration @EnableIntegration public static class TestConfiguration { @@ -130,6 +150,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { adapter.setConverter(String::new); adapter.setConsumerGroup("single_stream_group"); adapter.setFanOut(false); + adapter.setMetricsLevel(MetricsLevel.fromName("NONE")); adapter.setBindSourceRecord(true); return adapter; } From 4403ad0f6c6bb94f6e6ea772617d5ece05cdfe96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EB=AF=BC=EA=B7=9C?= Date: Sat, 16 Mar 2024 00:50:46 +0900 Subject: [PATCH 3/5] Modify test body and add author --- .../kinesis/KclMessageDrivenChannelAdapterTests.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 3e1064bd..3b98d9d8 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -50,6 +50,7 @@ /** * @author Artem Bilan * @author Siddharth Jain + * @author Minkyu Moon * * @since 3.0 */ @@ -128,12 +129,7 @@ public void metricsLevelOfMetricsFactoryShouldBeSetToMetricsLevelOfAdapter() { "scheduler.metricsFactory.metricsLevel", MetricsLevel.class ); - MetricsLevel expectedMetricsLevel = TestUtils.getPropertyValue( - kclMessageDrivenChannelAdapter, - "metricsLevel", - MetricsLevel.class - ); - assertThat(metricsLevel).isEqualTo(expectedMetricsLevel); + assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE); } @Configuration @@ -150,7 +146,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { adapter.setConverter(String::new); adapter.setConsumerGroup("single_stream_group"); adapter.setFanOut(false); - adapter.setMetricsLevel(MetricsLevel.fromName("NONE")); + adapter.setMetricsLevel(MetricsLevel.NONE); adapter.setBindSourceRecord(true); return adapter; } From 88b2484798034be6fb8212080a9cf6e13e563088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EB=AF=BC=EA=B7=9C?= Date: Sat, 16 Mar 2024 00:55:24 +0900 Subject: [PATCH 4/5] Fix import order --- .../aws/kinesis/KclMessageDrivenChannelAdapterTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 3b98d9d8..3a725454 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.springframework.integration.test.util.TestUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -29,6 +28,7 @@ import software.amazon.awssdk.services.kinesis.model.Consumer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.metrics.MetricsLevel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -39,11 +39,11 @@ import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import software.amazon.kinesis.metrics.MetricsLevel; import static org.assertj.core.api.Assertions.assertThat; From b38b1464cea43f55565ef22871bf72e56f01cb73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EB=AF=BC=EA=B7=9C?= Date: Sat, 16 Mar 2024 00:56:45 +0900 Subject: [PATCH 5/5] Add reference to instance variable --- .../aws/kinesis/KclMessageDrivenChannelAdapterTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 3a725454..6f13c0aa 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -125,7 +125,7 @@ void kclChannelAdapterReceivesRecords() { @Test public void metricsLevelOfMetricsFactoryShouldBeSetToMetricsLevelOfAdapter() { MetricsLevel metricsLevel = TestUtils.getPropertyValue( - kclMessageDrivenChannelAdapter, + this.kclMessageDrivenChannelAdapter, "scheduler.metricsFactory.metricsLevel", MetricsLevel.class );