From 12605c24a8a491130f82faa053d669eeaf2e8676 Mon Sep 17 00:00:00 2001 From: Dan Hall Date: Wed, 1 Jul 2020 20:06:57 +0100 Subject: [PATCH] Issue #9 - Dead Letter Exchange, Dead Letter Routing and Alternative Exchange Policies --- pom.xml | 6 + .../fridujo/rabbitmq/mock/AmqArguments.java | 44 +- .../rabbitmq/mock/MockConnectionFactory.java | 30 +- .../fridujo/rabbitmq/mock/MockNode.java | 19 + .../fridujo/rabbitmq/mock/MockPolicy.java | 77 +++ .../fridujo/rabbitmq/mock/MockQueue.java | 17 +- .../fridujo/rabbitmq/mock/Receiver.java | 4 + .../mock/exchange/BindableMockExchange.java | 35 +- .../mock/exchange/MockDefaultExchange.java | 7 + .../mock/tool/ParameterMarshaller.java | 45 ++ .../rabbitmq/mock/MockPolicyUseCaseTests.java | 584 ++++++++++++++++++ .../rabbitmq/mock/exchange/ExchangeTest.java | 8 + 12 files changed, 833 insertions(+), 43 deletions(-) create mode 100644 src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java create mode 100644 src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java create mode 100644 src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java diff --git a/pom.xml b/pom.xml index 1458d959..d3c62957 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,12 @@ ${junit.jupiter.version} test + + org.projectlombok + lombok + 1.18.8 + provided + diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java b/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java index cf59b028..ff775f01 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java @@ -1,5 +1,10 @@ package com.github.fridujo.rabbitmq.mock; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveInteger; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveLong; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveShort; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString; import static java.util.Collections.emptyMap; import java.util.Arrays; @@ -14,7 +19,7 @@ public class AmqArguments { public static final String QUEUE_MAX_LENGTH_BYTES_KEY = "x-max-length-bytes"; public static final String OVERFLOW_KEY = "x-overflow"; public static final String MAX_PRIORITY_KEY = "x-max-priority"; - private final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange"; + public static final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange"; private final Map arguments; public static AmqArguments empty() { @@ -26,58 +31,37 @@ public AmqArguments(Map arguments) { } public Optional getAlternateExchange() { - return string(ALTERNATE_EXCHANGE_KEY) - .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName)); + return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE_KEY, arguments); } public Optional getDeadLetterExchange() { - return string(DEAD_LETTER_EXCHANGE_KEY) - .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName)); + return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE_KEY, arguments); } public Optional getDeadLetterRoutingKey() { - return string(DEAD_LETTER_ROUTING_KEY_KEY); + return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY_KEY, arguments); } public Optional queueLengthLimit() { - return positiveInteger(QUEUE_MAX_LENGTH_KEY); + return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_KEY, arguments); } public Optional queueLengthBytesLimit() { - return positiveInteger(QUEUE_MAX_LENGTH_BYTES_KEY); + return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_BYTES_KEY, arguments); } public Overflow overflow() { - return string(OVERFLOW_KEY) + return getParameterAsString.apply(OVERFLOW_KEY, arguments) .flatMap(Overflow::parse) .orElse(Overflow.DROP_HEAD); } public Optional getMessageTtlOfQueue() { - return Optional.ofNullable(arguments.get(MESSAGE_TTL_KEY)) - .filter(aeObject -> aeObject instanceof Number) - .map(Number.class::cast) - .map(number -> number.longValue()); + return getParameterAsPositiveLong.apply(MESSAGE_TTL_KEY, arguments); } public Optional queueMaxPriority() { - return positiveInteger(MAX_PRIORITY_KEY) - .filter(i -> i < 256) - .map(Integer::shortValue); - } - - private Optional positiveInteger(String key) { - return Optional.ofNullable(arguments.get(key)) - .filter(aeObject -> aeObject instanceof Number) - .map(Number.class::cast) - .map(num -> num.intValue()) - .filter(i -> i > 0); - } - - private Optional string(String key) { - return Optional.ofNullable(arguments.get(key)) - .filter(aeObject -> aeObject instanceof String) - .map(String.class::cast); + return getParameterAsPositiveShort.apply(MAX_PRIORITY_KEY, arguments); } public enum Overflow { diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java index 27cf6969..d009f7a1 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java @@ -2,12 +2,22 @@ import com.github.fridujo.rabbitmq.mock.metrics.MetricsCollectorWrapper; import com.rabbitmq.client.AddressResolver; -import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.concurrent.ExecutorService; +import static java.lang.String.format; + public class MockConnectionFactory extends ConfigurableConnectionFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(MockConnectionFactory.class); + + private HashMap policies = new HashMap<>(); + public MockConnectionFactory() { setAutomaticRecoveryEnabled(false); } @@ -22,4 +32,22 @@ public MockConnection newConnection() { MockConnection mockConnection = new MockConnection(mockNode, metricsCollectorWrapper); return mockConnection; } + + public void setPolicy(MockPolicy policy) { + policies.put(policy.getName(), policy); + mockNode.applyPolicies(new HashSet(policies.values())); + } + + public void deletePolicy(String policyName) { + if(policies.remove(policyName) == null) { + LOGGER.error(format("Error deleting, policy with name %s was not found", policyName)); + } else { + mockNode.applyPolicies(new HashSet(policies.values())); + } + } + + public Collection listPolicies() { + return policies.values(); + } + } diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java index fce675de..3b3921fc 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java @@ -1,8 +1,11 @@ package com.github.fridujo.rabbitmq.mock; +import java.util.Collection; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.function.Supplier; import com.rabbitmq.client.AMQP; @@ -189,4 +192,20 @@ public void close(MockConnection mockConnection) { public Configuration getConfiguration() { return configuration; } + + public void applyPolicies(Set policies) { + applyPolicyToReceivers(policies, exchanges.values()); + applyPolicyToReceivers(policies, queues.values()); + } + + private void applyPolicyToReceivers(Set policies, Collection receivers) { + Function> calculateHighestPriorityPolicy = r -> policies.stream() + .sorted(MockPolicy.comparator) + .filter(p -> p.receiverMatchesPolicyPattern.test(r)) + .findFirst(); + + receivers.stream() + .filter(r -> !MockDefaultExchange.class.isInstance(r)) + .forEach(r -> r.setPolicy(calculateHighestPriorityPolicy.apply(r))); + } } diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java new file mode 100644 index 00000000..d80eb633 --- /dev/null +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java @@ -0,0 +1,77 @@ +package com.github.fridujo.rabbitmq.mock; + +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; +import lombok.Singular; +import lombok.ToString; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.ALL; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer; +import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +@Getter +@ToString +public class MockPolicy { + public static final String ALTERNATE_EXCHANGE = "alternate-exchange"; + public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange"; + public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key"; + + private String name; + private String pattern; + private Integer priority; + private Map definitions; + private ApplyTo applyTo; + + static final Comparator comparator = Comparator.comparing(MockPolicy::getPriority).reversed(); + + final Predicate receiverMatchesPolicyPattern = + r -> r.pointer().name.matches(this.pattern) && this.applyTo.matches(r) ; + + @Builder(toBuilder = true) + public MockPolicy(@NonNull String name, @NonNull String pattern, @NonNull @Singular Map definitions, + Integer priority, ApplyTo applyTo) { + this.name = name; + this.pattern = pattern; + this.definitions = definitions; + this.priority = priority == null ? 0 : priority; + this.applyTo = applyTo == null ? ALL : applyTo; + } + + public Optional getAlternateExchange() { + return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE, definitions); + } + + public Optional getDeadLetterExchange() { + return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE, definitions); + } + + public Optional getDeadLetterRoutingKey() { + return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY, definitions); + } + + public enum ApplyTo { + ALL(asList(ReceiverPointer.Type.QUEUE, ReceiverPointer.Type.EXCHANGE)), + EXCHANGE(singletonList(ReceiverPointer.Type.EXCHANGE)), + QUEUE(singletonList(ReceiverPointer.Type.QUEUE)); + + private List supportedTypes; + + ApplyTo(List supportTypes) { + this.supportedTypes = supportTypes; + } + + public boolean matches(Receiver r) { + return supportedTypes.contains(r.pointer().type); + } + } +} + diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java index acb7f035..5312a5e3 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java @@ -2,6 +2,7 @@ import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndEatExceptions; import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndTransformExceptions; +import static java.lang.String.format; import java.io.IOException; import java.time.Instant; @@ -50,6 +51,7 @@ public class MockQueue implements Receiver { private final Map unackedMessagesByDeliveryTag = new LinkedHashMap<>(); private final AtomicBoolean running = new AtomicBoolean(true); private final Map> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<>(); + private Optional mockPolicy = Optional.empty(); public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) { this.name = name; @@ -377,7 +379,15 @@ public String toString() { } private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) { + + String routingKey = arguments.getDeadLetterRoutingKey() + .map(Optional::of) + .orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterRoutingKey)) + .orElse(message.routingKey); + arguments.getDeadLetterExchange() + .map(Optional::of) + .orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterExchange)) .flatMap(receiverRegistry::getReceiver) .ifPresent(deadLetterExchange -> { LOGGER.debug(localized("dead-lettered to " + deadLetterExchange + ": " + message)); @@ -385,7 +395,7 @@ private void deadLetterWithReason(Message message, DeadLettering.ReasonType reas BasicProperties props = event.prependOn(message.props); deadLetterExchange.publish( message.exchangeName, - arguments.getDeadLetterRoutingKey().orElse(message.routingKey), + routingKey, props, message.body); } @@ -400,6 +410,11 @@ public List getUnackedMessages() { return new ArrayList<>(unackedMessagesByDeliveryTag.values()); } + public void setPolicy(Optional mockPolicy) { + mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p)))); + this.mockPolicy = mockPolicy; + } + static class ConsumerAndTag { private final String tag; diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java b/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java index a6423fc9..f32b157c 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java @@ -2,6 +2,8 @@ import com.rabbitmq.client.AMQP; +import java.util.Optional; + /** * Leverage the receiving capability of both Queues and Exchanges. */ @@ -14,4 +16,6 @@ public interface Receiver { boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body); ReceiverPointer pointer(); + + void setPolicy(Optional policy); } diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java index 8810fc70..f0228338 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java @@ -1,5 +1,14 @@ package com.github.fridujo.rabbitmq.mock.exchange; +import com.github.fridujo.rabbitmq.mock.AmqArguments; +import com.github.fridujo.rabbitmq.mock.MockPolicy; +import com.github.fridujo.rabbitmq.mock.Receiver; +import com.github.fridujo.rabbitmq.mock.ReceiverPointer; +import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; +import com.rabbitmq.client.AMQP; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.LinkedHashSet; import java.util.Map; @@ -9,18 +18,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.fridujo.rabbitmq.mock.AmqArguments; -import com.github.fridujo.rabbitmq.mock.MockQueue; -import com.github.fridujo.rabbitmq.mock.Receiver; -import com.github.fridujo.rabbitmq.mock.ReceiverPointer; -import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; -import com.rabbitmq.client.AMQP; +import static java.lang.String.format; public abstract class BindableMockExchange implements MockExchange { - private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BindableMockExchange.class); protected final Set bindConfigurations = new LinkedHashSet<>(); private final String name; @@ -28,6 +29,7 @@ public abstract class BindableMockExchange implements MockExchange { private final AmqArguments arguments; private final ReceiverPointer pointer; private final ReceiverRegistry receiverRegistry; + private Optional mockPolicy = Optional.empty(); protected BindableMockExchange(String name, String type, AmqArguments arguments, ReceiverRegistry receiverRegistry) { this.name = name; @@ -66,7 +68,12 @@ public boolean publish(String previousExchangeName, String routingKey, AMQP.Basi } private Optional getAlternateExchange() { - return arguments.getAlternateExchange().flatMap(receiverRegistry::getReceiver); + Optional policyAlternativeExchange = mockPolicy.flatMap(MockPolicy::getAlternateExchange); + Optional exchangeArgumentAlternativeExchange = arguments.getAlternateExchange(); + + return exchangeArgumentAlternativeExchange.map(Optional::of) + .orElse(policyAlternativeExchange) + .flatMap(receiverRegistry::getReceiver); } protected abstract Stream matchingReceivers(String routingKey, AMQP.BasicProperties props); @@ -85,6 +92,12 @@ public void unbind(ReceiverPointer receiver, String routingKey) { bindConfigurations.remove(new BindConfiguration(routingKey, receiver, Collections.emptyMap())); } + @Override + public void setPolicy(Optional mockPolicy) { + mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p)))); + this.mockPolicy = mockPolicy; + } + @Override public ReceiverPointer pointer() { return pointer; diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java index 5dacb77b..49598fc6 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java @@ -1,7 +1,9 @@ package com.github.fridujo.rabbitmq.mock.exchange; import java.util.Map; +import java.util.Optional; +import com.github.fridujo.rabbitmq.mock.MockPolicy; import com.rabbitmq.client.AMQP; import com.github.fridujo.rabbitmq.mock.MockNode; @@ -38,6 +40,11 @@ public void unbind(ReceiverPointer pointer, String routingKey) { // nothing needed } + @Override + public void setPolicy(Optional mockPolicy) { + throw new IllegalStateException("No policy should be applied for the default exchange"); + } + @Override public ReceiverPointer pointer() { throw new IllegalStateException("No ReceiverPointer (internal use) should be needed for the default exchange"); diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java b/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java new file mode 100644 index 00000000..82261195 --- /dev/null +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java @@ -0,0 +1,45 @@ +package com.github.fridujo.rabbitmq.mock.tool; + +import com.github.fridujo.rabbitmq.mock.ReceiverPointer; + +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; + +public class ParameterMarshaller { + + public static BiFunction, Optional> getParameterAsString = + (k, m) -> Optional.ofNullable(m.get(k)) + .filter(aeObject -> aeObject instanceof String) + .map(String.class::cast); + + public static BiFunction, Optional> getParameterAsPositiveInteger = + (k, m) -> Optional.ofNullable(m.get(k)) + .filter(aeObject -> aeObject instanceof Number) + .map(Number.class::cast) + .map(num -> num.intValue()) + .filter(i -> i > 0); + + public static BiFunction, Optional> getParameterAsPositiveShort = + (k, m) -> Optional.ofNullable(m.get(k)) + .filter(aeObject -> aeObject instanceof Number) + .map(Number.class::cast) + .map(num -> num.intValue()) + .filter(i -> i > 0) + .filter(i -> i < 256) + .map(Integer::shortValue); + + public static BiFunction, Optional> getParameterAsPositiveLong = + (k, m) -> Optional.ofNullable(m.get(k)) + .filter(aeObject -> aeObject instanceof Number) + .map(Number.class::cast) + .map(number -> number.longValue()); + + public static BiFunction, Optional> getParameterAsExchangePointer = + (k, m) -> Optional.ofNullable(m.get(k)) + .filter(aeObject -> aeObject instanceof String) + .map(String.class::cast) + .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName)); + +} + diff --git a/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java b/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java new file mode 100644 index 00000000..e53ba472 --- /dev/null +++ b/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java @@ -0,0 +1,584 @@ +package com.github.fridujo.rabbitmq.mock; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.EXCHANGE; +import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.QUEUE; +import static java.lang.Thread.sleep; +import static org.assertj.core.api.Assertions.assertThat; + +class MockPolicyUseCaseTests { + + public static final long TIMEOUT_INTERVAL = 200L; + public static final long RETRY_INTERVAL = 10L; + + @Test + void canApplyDeadLetterExchangeAsPolicy() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "", null); + channel.queueDeclare("data", true, false, false, null); + + MockPolicy deadLetterExchangePolicy = MockPolicy.builder() + .name("dead letter exchange policy") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex") + .build(); + + connectionFactory.setPolicy(deadLetterExchangePolicy); + + List messages = new ArrayList<>(); + + configureRejectOnQueue(channel, "data"); + + configureMessageCapture(channel, "rejected", messages); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void applyingDeadExchangeAsPolicyDoesNotSupersedeQueueArgument() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "", null); + + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "", null); + + channel.queueDeclare("data", true, false, false, new HashMap() {{ put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex"); }}); + + MockPolicy deadLetterExchangePolicy = MockPolicy.builder() + .name("dead letter exchange policy that does not override queue arguments") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex-policy") + .build(); + + connectionFactory.setPolicy(deadLetterExchangePolicy); + + List messages = new ArrayList<>(); + + configureRejectOnQueue(channel, "data"); + + configureMessageCapture(channel, "rejected", messages); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void canApplyDeadLetterRoutingKeyAsPolicy() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.DIRECT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "routed-by-policy", null); + channel.queueDeclare("data", true, false, false, new HashMap() {{ put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex"); }}); + + MockPolicy deadLetterRoutingKeyPolicy = MockPolicy.builder() + .name("dead letter routing key policy") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_ROUTING_KEY, "routed-by-policy") + .build(); + + connectionFactory.setPolicy(deadLetterRoutingKeyPolicy); + + List messages = new ArrayList<>(); + + configureRejectOnQueue(channel, "data"); + + configureMessageCapture(channel, "rejected", messages); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void applyingDeadLetterRoutingKeyAsPolicyDoesNotSupersedeQueueArguments() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.DIRECT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "routed-by-queue-args", null); + channel.queueDeclare("data", true, false, false, + new HashMap() {{ + put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex"); + put(AmqArguments.DEAD_LETTER_ROUTING_KEY_KEY, "routed-by-queue-args"); + }}); + + MockPolicy deadLetterRoutingKeyPolicy = MockPolicy.builder() + .name("dead letter routing key policy that does not override queue arguments") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_ROUTING_KEY, "routed-by-policy") + .build(); + + connectionFactory.setPolicy(deadLetterRoutingKeyPolicy); + + List messages = new ArrayList<>(); + + configureRejectOnQueue(channel, "data"); + + configureMessageCapture(channel, "rejected", messages); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void canApplyAlternativeExchangeAsPolicy() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void applyingAlternativeExchangeAsPolicyDoesNotSupersedeExchangeArguments() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT, true, false, + new HashMap() {{ + put(AmqArguments.ALTERNATE_EXCHANGE_KEY, "alternative-ex"); + }}); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex-policy") + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void doesNotApplyPolicyWhenPatternDoesNotMatch() throws IOException, TimeoutException, InterruptedException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern("does-not-match-non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + sleep(TIMEOUT_INTERVAL); + + assertThat(messages).isEmpty(); + } + } + } + + @Test + void canUseRegExAsPatternToApplyPolicy() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern(".*routable-ex$") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void highestPriorityPolicyIsApplied() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("unroutable-alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy lowPriorityAlternativeExchangePolicy = MockPolicy.builder() + .name(" low priority alternative exchange policy") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "unroutable-alternative-ex") + .build(); + + MockPolicy highPriorityAlternativeExchangePolicy = MockPolicy.builder() + .name("high priority alternative exchange policy") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .priority(1) + .build(); + + connectionFactory.setPolicy(lowPriorityAlternativeExchangePolicy); + connectionFactory.setPolicy(highPriorityAlternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void appliesSinglePolicyWhenMultiplePatternsMatchWithSamePriority() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("unroutable-alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy policy = MockPolicy.builder() + .name("one") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .build(); + + connectionFactory.setPolicy(policy); + connectionFactory.setPolicy(policy.toBuilder().name("three").build()); + connectionFactory.setPolicy(policy.toBuilder().name("two").build()); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + waitForMessageDelivered(messages); + + assertThat(messages.get(0)).isEqualTo("bytes".getBytes()); + } + } + } + + @Test + void queuePolicyIsNotAppliedWhenAppliesToSetToExchanges() throws IOException, TimeoutException, InterruptedException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "", null); + channel.queueDeclare("data", true, false, false, null); + + MockPolicy deadLetterExchangePolicy = MockPolicy.builder() + .name("dead letter exchange policy") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex") + .applyTo(EXCHANGE) + .build(); + + connectionFactory.setPolicy(deadLetterExchangePolicy); + + List messages = new ArrayList<>(); + + configureRejectOnQueue(channel, "data"); + + configureMessageCapture(channel, "rejected", messages); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + sleep(TIMEOUT_INTERVAL); + + assertThat(messages).isEmpty(); + } + } + } + + @Test + void cannotApplyPolicyToDefaultExchange() throws IOException, TimeoutException, InterruptedException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern("") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("", "non-routable-ex", null, "bytes".getBytes()); + + sleep(TIMEOUT_INTERVAL); + + assertThat(messages).isEmpty(); + } + } + } + + @Test + void exchangePolicyIsNotAppliedWhenAppliesToSetToQueues() throws IOException, TimeoutException, InterruptedException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT); + channel.queueDeclare("alternative", true, false, false, null); + channel.queueBind("alternative", "alternative-ex", "", null); + + MockPolicy alternativeExchangePolicy = MockPolicy.builder() + .name("alternative exchange policy") + .pattern("non-routable-ex") + .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex") + .applyTo(QUEUE) + .build(); + + connectionFactory.setPolicy(alternativeExchangePolicy); + + List messages = new ArrayList<>(); + + configureMessageCapture(channel, "alternative", messages); + + channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes()); + + sleep(TIMEOUT_INTERVAL); + + assertThat(messages).isEmpty(); + } + } + } + + @Test + void policiesCanBeChangedDynamicallyWithSetAndDelete() throws IOException, TimeoutException { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + try (Connection conn = connectionFactory.newConnection()) { + try (Channel channel = conn.createChannel()) { + channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT); + channel.exchangeDeclare("rejected-ex-2", BuiltinExchangeType.FANOUT); + channel.queueDeclare("rejected", true, false, false, null); + channel.queueDeclare("rejected-2", true, false, false, null); + channel.queueBind("rejected", "rejected-ex", "", null); + channel.queueBind("rejected-2", "rejected-ex-2", "", null); + channel.queueDeclare("data", true, false, false, null); + + List messagesFromPolicyA = new ArrayList<>(); + List messagesFromPolicyB = new ArrayList<>(); + + MockPolicy policyA = MockPolicy.builder() + .name("dead letter exchange policy") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex") + .build(); + + connectionFactory.setPolicy(policyA); + assertThat(connectionFactory.listPolicies()).containsExactly(policyA); + + configureRejectOnQueue(channel, "data"); + configureMessageCapture(channel, "rejected", messagesFromPolicyA); + configureMessageCapture(channel, "rejected-2", messagesFromPolicyB); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messagesFromPolicyA); + + assertThat(messagesFromPolicyA.get(0)).isEqualTo("bytes".getBytes()); + assertThat(messagesFromPolicyB).isEmpty(); + + messagesFromPolicyA.clear(); + + MockPolicy policyB = MockPolicy.builder() + .name("higher priority dead letter exchange policy") + .pattern("data") + .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex-2") + .priority(1) + .build(); + + connectionFactory.setPolicy(policyB); + assertThat(connectionFactory.listPolicies()).containsExactly(policyA, policyB); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messagesFromPolicyB); + + assertThat(messagesFromPolicyB.get(0)).isEqualTo("bytes".getBytes()); + assertThat(messagesFromPolicyA).isEmpty(); + + messagesFromPolicyB.clear(); + + + MockPolicy higherPriorityPolicyA = policyA.toBuilder().priority(2).build(); + connectionFactory.setPolicy(higherPriorityPolicyA); + assertThat(connectionFactory.listPolicies()).containsExactly(higherPriorityPolicyA, policyB); + + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messagesFromPolicyA); + + assertThat(messagesFromPolicyA.get(0)).isEqualTo("bytes".getBytes()); + assertThat(messagesFromPolicyB).isEmpty(); + + messagesFromPolicyA.clear(); + + connectionFactory.deletePolicy(policyA.getName()); + assertThat(connectionFactory.listPolicies()).containsExactly(policyB); + + channel.basicPublish("", "data", null, "bytes".getBytes()); + + waitForMessageDelivered(messagesFromPolicyB); + + assertThat(messagesFromPolicyB.get(0)).isEqualTo("bytes".getBytes()); + assertThat(messagesFromPolicyA).isEmpty(); + } + } + } + + @Test + public void deletingPolicyThatDoesNotExistThrowsNoExceptions() { + MockConnectionFactory connectionFactory = new MockConnectionFactory(); + connectionFactory.deletePolicy("Policy that doesn't exist"); + } + + private void configureRejectOnQueue(Channel channel, String queueName) throws IOException { + channel.basicConsume(queueName, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + channel.basicReject(envelope.getDeliveryTag(), false); + } + }); + } + + private void configureMessageCapture(Channel channel, String queueName, List messages) throws IOException { + channel.basicConsume(queueName, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + messages.add(body); + } + }); + } + + private void waitForMessageDelivered(List messages) { + Assertions.assertTimeoutPreemptively(Duration.ofMillis(TIMEOUT_INTERVAL), () -> { + while(messages.isEmpty()) { + sleep(RETRY_INTERVAL); + } + }); + } +} diff --git a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java index 5fad1993..ef3f735e 100644 --- a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java +++ b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java @@ -5,6 +5,7 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -28,6 +29,7 @@ import com.github.fridujo.rabbitmq.mock.exchange.BindableMockExchange.BindConfiguration; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; +import org.springframework.amqp.core.DirectExchange; class ExchangeTest { @@ -74,6 +76,12 @@ void binding_key_does_not_match_routing_key(String bindingKey, String routingKey assertThat(directExchange.match(bindConfiguration, routingKey, emptyMap())).isFalse(); } + + @Test + void applying_policy_throws_exception() { + MockDefaultExchange mockDefaultExchange = new MockDefaultExchange(null); + assertThatThrownBy(() -> mockDefaultExchange.setPolicy(Optional.empty()), "No policy should be applied for the default exchange"); + } } @Nested