Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce and fix the problem with suspended overrides being delivered simultaneously #94

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,26 @@
import io.vlingo.xoom.actors.Dispatcher;
import io.vlingo.xoom.actors.Mailbox;
import io.vlingo.xoom.actors.Message;
import io.vlingo.xoom.actors.ResumingMailbox;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ConcurrentQueueMailbox implements Mailbox, Runnable {
private AtomicBoolean delivering;
private final Dispatcher dispatcher;
private AtomicReference<SuspendedDeliveryOverrides> suspendedDeliveryOverrides;
private final AtomicReference<SuspendedDeliveryOverrides> suspendedDeliveryOverrides;
private final AtomicReference<SuspendedDeliveryQueue> suspendedDeliveryQueue;
private final Queue<Message> queue;
private final byte throttlingCount;

@Override
public void close() {
queue.clear();
suspendedDeliveryQueue.get().clear();
}

@Override
Expand All @@ -46,26 +45,16 @@ public int concurrencyCapacity() {
@Override
public void resume(final String name) {
if (suspendedDeliveryOverrides.get().pop(name)) {
suspendedDeliveryQueue.get().putBack(this::queue);
dispatcher.execute(this);
}
}

@Override
public void send(final Message message) {
if (isSuspended()) {
if (suspendedDeliveryOverrides.get().matchesTop(message.protocol())) {
dispatcher.execute(new ResumingMailbox(message));
if (!queue.isEmpty()) {
dispatcher.execute(this);
}
return;
}
queue.add(message);
} else {
queue.add(message);
if (!isDelivering()) {
dispatcher.execute(this);
}
queue(message);
if (!isDelivering()) {
dispatcher.execute(this);
}
}

Expand All @@ -85,9 +74,17 @@ public boolean isSuspendedFor(String name) {
.find(name).isEmpty();
}

private boolean isSuspendedExceptFor(final Message override) {
return isSuspended() && suspendedDeliveryOverrides.get().matchesTop(override.protocol());
}

@Override
public Message receive() {
return queue.poll();
if (!isSuspended()) {
return queue.poll();
} else {
return suspendedDeliveryQueue.get().poll();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed receive() to also look at the suspended delivery queue. This is a public method so I'm not 100% sure I should've done it...

}

@Override
Expand All @@ -100,9 +97,6 @@ public void run() {
if (delivering.compareAndSet(false, true)) {
final int total = throttlingCount;
for (int count = 0; count < total; ++count) {
if (isSuspended()) {
break;
}
final Message message = receive();
if (message != null) {
message.deliver();
Expand All @@ -111,7 +105,7 @@ public void run() {
}
}
delivering.set(false);
if (!queue.isEmpty()) {
if (!isQueueEmpty()) {
dispatcher.execute(this);
}
}
Expand All @@ -120,13 +114,26 @@ public void run() {
/* @see io.vlingo.xoom.actors.Mailbox#pendingMessages() */
@Override
public int pendingMessages() {
return queue.size();
return queue.size() + suspendedDeliveryQueue.get().size();
}

private void queue(final Message message) {
if (isSuspendedExceptFor(message)) {
suspendedDeliveryQueue.get().add(message);
} else {
queue.add(message);
}
}

private boolean isQueueEmpty() {
return queue.isEmpty() && suspendedDeliveryQueue.get().isEmpty();
}

protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttlingCount) {
this.dispatcher = dispatcher;
this.delivering = new AtomicBoolean(false);
this.suspendedDeliveryOverrides = new AtomicReference<>(new SuspendedDeliveryOverrides());
this.suspendedDeliveryQueue = new AtomicReference<>(new SuspendedDeliveryQueue());
this.queue = new ConcurrentLinkedQueue<Message>();
this.throttlingCount = (byte) throttlingCount;
}
Expand Down Expand Up @@ -261,4 +268,60 @@ private static class Overrides {
this.obsolete = false;
}
}

private static class SuspendedDeliveryQueue {
private final AtomicBoolean accessible = new AtomicBoolean(false);
private final LinkedList<Message> queue = new LinkedList<>();

public void add(final Message message) {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.add(message);
accessible.set(false);
break;
}
}
}

public Message poll() {
while(true) {
if (accessible.compareAndSet(false, true)) {
Message message = null;
if (!queue.isEmpty()) {
message = queue.pop();
}
accessible.set(false);
return message;
}
}
}

public void putBack(final Consumer<Message> consumer) {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.forEach(consumer);
queue.clear();
accessible.set(false);
break;
}
}
}

public void clear() {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.clear();
break;
}
}
}

public boolean isEmpty() {
return queue.isEmpty();
}

public int size() {
return queue.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -63,6 +65,81 @@ public void testThatSuspendResumes(){
assertFalse(mailbox.isSuspended());
}

@Test
public void testThatMessagesAreDeliveredInOrderTheyArrived() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Test
public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that the number of threads needs to be at least 2 for this test to fail.

final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

mailbox.suspendExceptFor("paused#", CountTaker.class);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
Copy link
Member Author

@jakzal jakzal Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is failing with: expected:<[0, 1, 2]> but was:<[1, 2, 0]>

Error:  Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.455 s <<< FAILURE! - in io.vlingo.xoom.actors.plugin.mailbox.concurrentqueue.ConcurrentQueueMailboxTest
Error:  testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived(io.vlingo.xoom.actors.plugin.mailbox.concurrentqueue.ConcurrentQueueMailboxTest)  Time elapsed: 0.117 s  <<< FAILURE!
java.lang.AssertionError: expected:<[0, 1, 2]> but was:<[1, 2, 0]>
	at io.vlingo.xoom.actors.plugin.mailbox.concurrentqueue.ConcurrentQueueMailboxTest.testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived(ConcurrentQueueMailboxTest.java:113)

}

@Test
public void testThatSuspendedButNotHandledMessagesAreQueued() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

mailbox.suspendExceptFor("paused#", CountTaker.class);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

mailbox.resume("paused#");

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Before
@Override
public void setUp() throws Exception {
Expand All @@ -81,6 +158,13 @@ public void tearDown() throws Exception {
dispatcher.close();
}

private void delay(final int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}

public static interface CountTaker {
void take(final int count);
}
Expand All @@ -106,7 +190,8 @@ private TestResults(final int happenings) {
this.accessSafely = AccessSafely
.afterCompleting(happenings)
.writingWith("counts", (Consumer<Integer>) list::add)
.readingWith("counts", (Integer index)-> list.get(index));
.readingWith("counts", (Integer index)-> list.get(index))
.readingWith("counts", () -> list);
}

void addCount(Integer i){
Expand All @@ -116,5 +201,9 @@ void addCount(Integer i){
Integer getCount(int index){
return this.accessSafely.readFrom("counts", index);
}

List<Integer> getCounts() {
return this.accessSafely.readFrom("counts");
}
}
}