diff --git a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java index c7f05324..2d05a2e7 100644 --- a/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java +++ b/jdbc/src/test/java/org/apache/pekko/projection/jdbc/JdbcProjectionTest.java @@ -54,11 +54,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class JdbcProjectionTest extends JUnitSuite { @@ -209,13 +211,14 @@ private void expectNextUntilErrorMessage(TestSubscriber.Probe probe, Strin } private JdbcHandler concatHandler(StringBuffer str) { - return concatHandler(str, __ -> false); + return concatHandler(str, new CountDownLatch(0), __ -> false); } private JdbcHandler concatHandler( - StringBuffer buffer, Predicate failPredicate) { + StringBuffer buffer, CountDownLatch latch, Predicate failPredicate) { return JdbcHandler.fromFunction( (PureJdbcSession session, Envelope envelope) -> { + latch.countDown(); if (failPredicate.test(envelope.offset)) { throw new RuntimeException(failMessage(envelope.offset)); } else { @@ -275,6 +278,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { ProjectionId projectionId = genRandomProjectionId(); StringBuffer str = new StringBuffer(); + CountDownLatch latch = new CountDownLatch(3); Projection projection = JdbcProjection.exactlyOnce( @@ -282,7 +286,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { sourceProvider(entityId), jdbcSessionCreator, // fail on fourth offset - () -> concatHandler(str, offset -> offset == 4), + () -> concatHandler(str, latch, offset -> offset == 4), testKit.system()); projectionTestKit.runWithTestSink( @@ -290,6 +294,7 @@ public void exactlyOnceShouldRestartFromPreviousOffset() { (probe) -> { probe.request(3); probe.expectNextN(3); + assertTrue(latch.await(3, TimeUnit.SECONDS)); assertEquals("abc|def|ghi|", str.toString()); expectNextUntilErrorMessage(probe, failMessage(4)); }); @@ -326,6 +331,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { ProjectionId projectionId = genRandomProjectionId(); StringBuffer str = new StringBuffer(); + CountDownLatch latch = new CountDownLatch(3); Projection projection = JdbcProjection.atLeastOnce( @@ -333,7 +339,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { sourceProvider(entityId), jdbcSessionCreator, // fail on fourth offset - () -> concatHandler(str, offset -> offset == 4), + () -> concatHandler(str, latch, offset -> offset == 4), testKit.system()) .withSaveOffset(1, Duration.ZERO); @@ -348,6 +354,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() { */ probe.request(2); probe.expectNextN(2); + assertTrue(latch.await(3, TimeUnit.SECONDS)); assertEquals("abc|def|ghi|", str.toString()); expectNextUntilErrorMessage(probe, failMessage(4)); });