From fc30006314798820bc75f06ba545925e320e80ae Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Tue, 14 Dec 2021 21:33:29 +0000 Subject: [PATCH] Reproduce the problem with suspended overrides being delivered simultaneously Signed-off-by: Jakub Zalas --- .../ConcurrentQueueMailboxTest.java | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java index 9094c2ef..d73997fb 100644 --- a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java +++ b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java @@ -15,6 +15,8 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; import static org.junit.Assert.assertEquals; @@ -63,6 +65,54 @@ public void testThatSuspendResumes(){ assertFalse(mailbox.isSuspended()); } + @Test + public void testThatMessagesAreHandledInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + + @Test + public void testThatSuspendedOverrideMessagesAreHandledInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + @Before @Override public void setUp() throws Exception { @@ -81,6 +131,13 @@ public void tearDown() throws Exception { dispatcher.close(); } + private void delay(final int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } + } + public static interface CountTaker { void take(final int count); } @@ -106,7 +163,8 @@ private TestResults(final int happenings) { this.accessSafely = AccessSafely .afterCompleting(happenings) .writingWith("counts", (Consumer) list::add) - .readingWith("counts", (Integer index)-> list.get(index)); + .readingWith("counts", (Integer index)-> list.get(index)) + .readingWith("counts", () -> list); } void addCount(Integer i){ @@ -116,5 +174,9 @@ void addCount(Integer i){ Integer getCount(int index){ return this.accessSafely.readFrom("counts", index); } + + List getCounts() { + return this.accessSafely.readFrom("counts"); + } } }