-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reproduce and fix the problem with suspended overrides being delivered simultaneously #94
base: master
Are you sure you want to change the base?
Changes from all commits
9a90b32
6b7439d
bc71e9b
3ebaae3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ | |
import org.junit.Test; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.function.Consumer; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
|
@@ -63,6 +65,81 @@ public void testThatSuspendResumes(){ | |
assertFalse(mailbox.isSuspended()); | ||
} | ||
|
||
@Test | ||
public void testThatMessagesAreDeliveredInOrderTheyArrived() { | ||
|
||
final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); | ||
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); | ||
|
||
final TestResults testResults = new TestResults(3); | ||
final CountTakerActor actor = new CountTakerActor(testResults); | ||
|
||
for (int count = 0; count < 3; ++count) { | ||
final int countParam = count; | ||
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> { | ||
// Give longer Delay to messages that come first | ||
delay(20 - (countParam * 10)); | ||
consumerActor.take(countParam); | ||
}; | ||
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)"); | ||
mailbox.send(message); | ||
} | ||
|
||
assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); | ||
} | ||
|
||
@Test | ||
public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() { | ||
|
||
final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Notice that the number of threads needs to be at least 2 for this test to fail. |
||
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); | ||
|
||
final TestResults testResults = new TestResults(3); | ||
final CountTakerActor actor = new CountTakerActor(testResults); | ||
|
||
mailbox.suspendExceptFor("paused#", CountTaker.class); | ||
|
||
for (int count = 0; count < 3; ++count) { | ||
final int countParam = count; | ||
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> { | ||
// Give longer Delay to messages that come first | ||
delay(20 - (countParam * 10)); | ||
consumerActor.take(countParam); | ||
}; | ||
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)"); | ||
mailbox.send(message); | ||
} | ||
|
||
assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is failing with:
|
||
} | ||
|
||
@Test | ||
public void testThatSuspendedButNotHandledMessagesAreQueued() { | ||
|
||
final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); | ||
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); | ||
|
||
final TestResults testResults = new TestResults(3); | ||
final CountTakerActor actor = new CountTakerActor(testResults); | ||
|
||
mailbox.suspendExceptFor("paused#", CountTaker.class); | ||
|
||
for (int count = 0; count < 3; ++count) { | ||
final int countParam = count; | ||
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> { | ||
// Give longer Delay to messages that come first | ||
delay(20 - (countParam * 10)); | ||
consumerActor.take(countParam); | ||
}; | ||
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)"); | ||
mailbox.send(message); | ||
} | ||
|
||
mailbox.resume("paused#"); | ||
|
||
assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); | ||
} | ||
|
||
@Before | ||
@Override | ||
public void setUp() throws Exception { | ||
|
@@ -81,6 +158,13 @@ public void tearDown() throws Exception { | |
dispatcher.close(); | ||
} | ||
|
||
private void delay(final int millis) { | ||
try { | ||
Thread.sleep(millis); | ||
} catch (InterruptedException e) { | ||
} | ||
} | ||
|
||
public static interface CountTaker { | ||
void take(final int count); | ||
} | ||
|
@@ -106,7 +190,8 @@ private TestResults(final int happenings) { | |
this.accessSafely = AccessSafely | ||
.afterCompleting(happenings) | ||
.writingWith("counts", (Consumer<Integer>) list::add) | ||
.readingWith("counts", (Integer index)-> list.get(index)); | ||
.readingWith("counts", (Integer index)-> list.get(index)) | ||
.readingWith("counts", () -> list); | ||
} | ||
|
||
void addCount(Integer i){ | ||
|
@@ -116,5 +201,9 @@ void addCount(Integer i){ | |
Integer getCount(int index){ | ||
return this.accessSafely.readFrom("counts", index); | ||
} | ||
|
||
List<Integer> getCounts() { | ||
return this.accessSafely.readFrom("counts"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed
receive()
to also look at the suspended delivery queue. This is a public method so I'm not 100% sure I should've done it...