diff --git a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java b/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java deleted file mode 100644 index 294c69d0..00000000 --- a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright © 2012-2021 VLINGO LABS. All rights reserved. -// -// This Source Code Form is subject to the terms of the -// Mozilla Public License, v. 2.0. If a copy of the MPL -// was not distributed with this file, You can obtain -// one at https://mozilla.org/MPL/2.0/. - -package io.vlingo.xoom.actors; - -public class ResumingMailbox implements Mailbox { - private final Message message; - - public ResumingMailbox(final Message message) { - this.message = message; - } - - @Override - public void run() { - message.deliver(); - } - - @Override - public void close() { } - - @Override - public boolean isClosed() { return false; } - - @Override - public boolean isDelivering() { return true; } - - @Override - public int concurrencyCapacity() { return 0; } - - @Override - public void resume(final String name) { } - - @Override - public void send(final Message message) { } - - @Override - public void suspendExceptFor(String name, Class... overrides) { } - - @Override - public boolean isSuspended() { return false; } - - @Override - public Message receive() { return null; } - - @Override - public int pendingMessages() { return 1; } -} diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index 1679004c..e7731aeb 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -10,27 +10,26 @@ import io.vlingo.xoom.actors.Dispatcher; import io.vlingo.xoom.actors.Mailbox; import io.vlingo.xoom.actors.Message; -import io.vlingo.xoom.actors.ResumingMailbox; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; public class ConcurrentQueueMailbox implements Mailbox, Runnable { private AtomicBoolean delivering; private final Dispatcher dispatcher; - private AtomicReference suspendedDeliveryOverrides; + private final AtomicReference suspendedDeliveryOverrides; + private final AtomicReference suspendedDeliveryQueue; private final Queue queue; private final byte throttlingCount; @Override public void close() { queue.clear(); + suspendedDeliveryQueue.get().clear(); } @Override @@ -46,26 +45,16 @@ public int concurrencyCapacity() { @Override public void resume(final String name) { if (suspendedDeliveryOverrides.get().pop(name)) { + suspendedDeliveryQueue.get().putBack(this::queue); dispatcher.execute(this); } } @Override public void send(final Message message) { - if (isSuspended()) { - if (suspendedDeliveryOverrides.get().matchesTop(message.protocol())) { - dispatcher.execute(new ResumingMailbox(message)); - if (!queue.isEmpty()) { - dispatcher.execute(this); - } - return; - } - queue.add(message); - } else { - queue.add(message); - if (!isDelivering()) { - dispatcher.execute(this); - } + queue(message); + if (!isDelivering()) { + dispatcher.execute(this); } } @@ -85,9 +74,17 @@ public boolean isSuspendedFor(String name) { .find(name).isEmpty(); } + private boolean isSuspendedExceptFor(final Message override) { + return isSuspended() && suspendedDeliveryOverrides.get().matchesTop(override.protocol()); + } + @Override public Message receive() { - return queue.poll(); + if (!isSuspended()) { + return queue.poll(); + } else { + return suspendedDeliveryQueue.get().poll(); + } } @Override @@ -100,9 +97,6 @@ public void run() { if (delivering.compareAndSet(false, true)) { final int total = throttlingCount; for (int count = 0; count < total; ++count) { - if (isSuspended()) { - break; - } final Message message = receive(); if (message != null) { message.deliver(); @@ -111,7 +105,7 @@ public void run() { } } delivering.set(false); - if (!queue.isEmpty()) { + if (!isQueueEmpty()) { dispatcher.execute(this); } } @@ -120,13 +114,26 @@ public void run() { /* @see io.vlingo.xoom.actors.Mailbox#pendingMessages() */ @Override public int pendingMessages() { - return queue.size(); + return queue.size() + suspendedDeliveryQueue.get().size(); + } + + private void queue(final Message message) { + if (isSuspendedExceptFor(message)) { + suspendedDeliveryQueue.get().add(message); + } else { + queue.add(message); + } + } + + private boolean isQueueEmpty() { + return queue.isEmpty() && suspendedDeliveryQueue.get().isEmpty(); } protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttlingCount) { this.dispatcher = dispatcher; this.delivering = new AtomicBoolean(false); this.suspendedDeliveryOverrides = new AtomicReference<>(new SuspendedDeliveryOverrides()); + this.suspendedDeliveryQueue = new AtomicReference<>(new SuspendedDeliveryQueue()); this.queue = new ConcurrentLinkedQueue(); this.throttlingCount = (byte) throttlingCount; } @@ -261,4 +268,60 @@ private static class Overrides { this.obsolete = false; } } + + private static class SuspendedDeliveryQueue { + private final AtomicBoolean accessible = new AtomicBoolean(false); + private final LinkedList queue = new LinkedList<>(); + + public void add(final Message message) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.add(message); + accessible.set(false); + break; + } + } + } + + public Message poll() { + while(true) { + if (accessible.compareAndSet(false, true)) { + Message message = null; + if (!queue.isEmpty()) { + message = queue.pop(); + } + accessible.set(false); + return message; + } + } + } + + public void putBack(final Consumer consumer) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.forEach(consumer); + queue.clear(); + accessible.set(false); + break; + } + } + } + + public void clear() { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.clear(); + break; + } + } + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + public int size() { + return queue.size(); + } + } } diff --git a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java index 9094c2ef..037cca06 100644 --- a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java +++ b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java @@ -15,6 +15,8 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; import static org.junit.Assert.assertEquals; @@ -63,6 +65,81 @@ public void testThatSuspendResumes(){ assertFalse(mailbox.isSuspended()); } + @Test + public void testThatMessagesAreDeliveredInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + + @Test + public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + + @Test + public void testThatSuspendedButNotHandledMessagesAreQueued() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + mailbox.resume("paused#"); + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + @Before @Override public void setUp() throws Exception { @@ -81,6 +158,13 @@ public void tearDown() throws Exception { dispatcher.close(); } + private void delay(final int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } + } + public static interface CountTaker { void take(final int count); } @@ -106,7 +190,8 @@ private TestResults(final int happenings) { this.accessSafely = AccessSafely .afterCompleting(happenings) .writingWith("counts", (Consumer) list::add) - .readingWith("counts", (Integer index)-> list.get(index)); + .readingWith("counts", (Integer index)-> list.get(index)) + .readingWith("counts", () -> list); } void addCount(Integer i){ @@ -116,5 +201,9 @@ void addCount(Integer i){ Integer getCount(int index){ return this.accessSafely.readFrom("counts", index); } + + List getCounts() { + return this.accessSafely.readFrom("counts"); + } } }