Skip to content

Commit

Permalink
Reproduce the problem with suspended overrides being delivered simult…
Browse files Browse the repository at this point in the history
…aneously

Signed-off-by: Jakub Zalas <[email protected]>
  • Loading branch information
jakzal committed Dec 14, 2021
1 parent 19cbb2f commit fc30006
Showing 1 changed file with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -63,6 +65,54 @@ public void testThatSuspendResumes(){
assertFalse(mailbox.isSuspended());
}

@Test
public void testThatMessagesAreHandledInOrderTheyArrived() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Test
public void testThatSuspendedOverrideMessagesAreHandledInOrderTheyArrived() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

mailbox.suspendExceptFor("paused#", CountTaker.class);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Before
@Override
public void setUp() throws Exception {
Expand All @@ -81,6 +131,13 @@ public void tearDown() throws Exception {
dispatcher.close();
}

private void delay(final int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}

public static interface CountTaker {
void take(final int count);
}
Expand All @@ -106,7 +163,8 @@ private TestResults(final int happenings) {
this.accessSafely = AccessSafely
.afterCompleting(happenings)
.writingWith("counts", (Consumer<Integer>) list::add)
.readingWith("counts", (Integer index)-> list.get(index));
.readingWith("counts", (Integer index)-> list.get(index))
.readingWith("counts", () -> list);
}

void addCount(Integer i){
Expand All @@ -116,5 +174,9 @@ void addCount(Integer i){
Integer getCount(int index){
return this.accessSafely.readFrom("counts", index);
}

List<Integer> getCounts() {
return this.accessSafely.readFrom("counts");
}
}
}

0 comments on commit fc30006

Please sign in to comment.