From d9b0352a7b9e38f33b9d91c703147aabb1305b82 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 20 Feb 2024 14:26:00 +0800 Subject: [PATCH 1/4] fix flaky JdbcProjectionTest --- .../org/apache/pekko/projection/jdbc/JdbcProjectionTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java index c7f05324..15b35782 100644 --- a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java +++ b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java @@ -346,8 +346,9 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { * * See https://github.com/akka/akka-projection/issues/462 for a possible solution. */ - probe.request(2); - probe.expectNextN(2); + // because concatHandler won't concat element that offset 4, so this is safe that request 3 + probe.request(3); + probe.expectNextN(3); assertEquals("abc|def|ghi|", str.toString()); expectNextUntilErrorMessage(probe, failMessage(4)); }); From ba1665670817fc2534a2d8d3047b3dfe25a57d7f Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 20 Feb 2024 14:28:44 +0800 Subject: [PATCH 2/4] code fmt --- .../org/apache/pekko/projection/jdbc/JdbcProjectionTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java index 15b35782..0f2c2eaf 100644 --- a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java +++ b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java @@ -346,7 +346,8 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { * * See https://github.com/akka/akka-projection/issues/462 for a possible solution. */ - // because concatHandler won't concat element that offset 4, so this is safe that request 3 + // because concatHandler won't concat element that offset 4, so this is safe that request + // 3 probe.request(3); probe.expectNextN(3); assertEquals("abc|def|ghi|", str.toString()); From 34f7d4335fb81057fca7d6980b8eadb25f382e19 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 20 Feb 2024 14:48:51 +0800 Subject: [PATCH 3/4] block until element arrived --- .../projection/jdbc/JdbcProjectionTest.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java index 0f2c2eaf..0f714bd1 100644 --- a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java +++ b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java @@ -54,11 +54,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class JdbcProjectionTest extends JUnitSuite { @@ -209,13 +211,14 @@ private void expectNextUntilErrorMessage(TestSubscriber.Probe probe, Strin } private JdbcHandler concatHandler(StringBuffer str) { - return concatHandler(str, __ -> false); + return concatHandler(str, new CountDownLatch(0), __ -> false); } private JdbcHandler concatHandler( - StringBuffer buffer, Predicate failPredicate) { + StringBuffer buffer, CountDownLatch latch, Predicate failPredicate) { return JdbcHandler.fromFunction( (PureJdbcSession session, Envelope envelope) -> { + latch.countDown(); if (failPredicate.test(envelope.offset)) { throw new RuntimeException(failMessage(envelope.offset)); } else { @@ -275,6 +278,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { ProjectionId projectionId = genRandomProjectionId(); StringBuffer str = new StringBuffer(); + CountDownLatch latch = new CountDownLatch(3); Projection projection = JdbcProjection.exactlyOnce( @@ -282,7 +286,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { sourceProvider(entityId), jdbcSessionCreator, // fail on fourth offset - () -> concatHandler(str, offset -> offset == 4), + () -> concatHandler(str, latch, offset -> offset == 4), testKit.system()); projectionTestKit.runWithTestSink( @@ -326,6 +330,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { ProjectionId projectionId = genRandomProjectionId(); StringBuffer str = new StringBuffer(); + CountDownLatch latch = new CountDownLatch(3); Projection projection = JdbcProjection.atLeastOnce( @@ -333,7 +338,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { sourceProvider(entityId), jdbcSessionCreator, // fail on fourth offset - () -> concatHandler(str, offset -> offset == 4), + () -> concatHandler(str, latch, offset -> offset == 4), testKit.system()) .withSaveOffset(1, Duration.ZERO); @@ -346,10 +351,9 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { * * See https://github.com/akka/akka-projection/issues/462 for a possible solution. */ - // because concatHandler won't concat element that offset 4, so this is safe that request - // 3 - probe.request(3); - probe.expectNextN(3); + probe.request(2); + probe.expectNextN(2); + assertTrue(latch.await(3, TimeUnit.SECONDS)); assertEquals("abc|def|ghi|", str.toString()); expectNextUntilErrorMessage(probe, failMessage(4)); }); From be89dfd58c0b3993491e2131e1b08dadc253e6cc Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 20 Feb 2024 14:50:32 +0800 Subject: [PATCH 4/4] additional block for exactlyOnce --- .../org/apache/pekko/projection/jdbc/JdbcProjectionTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java index 0f714bd1..2d05a2e7 100644 --- a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java +++ b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java @@ -294,6 +294,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { (probe) -> { probe.request(3); probe.expectNextN(3); + assertTrue(latch.await(3, TimeUnit.SECONDS)); assertEquals("abc|def|ghi|", str.toString()); expectNextUntilErrorMessage(probe, failMessage(4)); });