diff --git a/pom.xml b/pom.xml
index 1458d959..d3c62957 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,12 @@
${junit.jupiter.version}
test
+
+ org.projectlombok
+ lombok
+ 1.18.8
+ provided
+
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java b/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java
index cf59b028..ff775f01 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java
@@ -1,5 +1,10 @@
package com.github.fridujo.rabbitmq.mock;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveInteger;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveLong;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveShort;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
import static java.util.Collections.emptyMap;
import java.util.Arrays;
@@ -14,7 +19,7 @@ public class AmqArguments {
public static final String QUEUE_MAX_LENGTH_BYTES_KEY = "x-max-length-bytes";
public static final String OVERFLOW_KEY = "x-overflow";
public static final String MAX_PRIORITY_KEY = "x-max-priority";
- private final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
+ public static final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
private final Map arguments;
public static AmqArguments empty() {
@@ -26,58 +31,37 @@ public AmqArguments(Map arguments) {
}
public Optional getAlternateExchange() {
- return string(ALTERNATE_EXCHANGE_KEY)
- .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
+ return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE_KEY, arguments);
}
public Optional getDeadLetterExchange() {
- return string(DEAD_LETTER_EXCHANGE_KEY)
- .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
+ return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE_KEY, arguments);
}
public Optional getDeadLetterRoutingKey() {
- return string(DEAD_LETTER_ROUTING_KEY_KEY);
+ return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY_KEY, arguments);
}
public Optional queueLengthLimit() {
- return positiveInteger(QUEUE_MAX_LENGTH_KEY);
+ return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_KEY, arguments);
}
public Optional queueLengthBytesLimit() {
- return positiveInteger(QUEUE_MAX_LENGTH_BYTES_KEY);
+ return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_BYTES_KEY, arguments);
}
public Overflow overflow() {
- return string(OVERFLOW_KEY)
+ return getParameterAsString.apply(OVERFLOW_KEY, arguments)
.flatMap(Overflow::parse)
.orElse(Overflow.DROP_HEAD);
}
public Optional getMessageTtlOfQueue() {
- return Optional.ofNullable(arguments.get(MESSAGE_TTL_KEY))
- .filter(aeObject -> aeObject instanceof Number)
- .map(Number.class::cast)
- .map(number -> number.longValue());
+ return getParameterAsPositiveLong.apply(MESSAGE_TTL_KEY, arguments);
}
public Optional queueMaxPriority() {
- return positiveInteger(MAX_PRIORITY_KEY)
- .filter(i -> i < 256)
- .map(Integer::shortValue);
- }
-
- private Optional positiveInteger(String key) {
- return Optional.ofNullable(arguments.get(key))
- .filter(aeObject -> aeObject instanceof Number)
- .map(Number.class::cast)
- .map(num -> num.intValue())
- .filter(i -> i > 0);
- }
-
- private Optional string(String key) {
- return Optional.ofNullable(arguments.get(key))
- .filter(aeObject -> aeObject instanceof String)
- .map(String.class::cast);
+ return getParameterAsPositiveShort.apply(MAX_PRIORITY_KEY, arguments);
}
public enum Overflow {
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java
index 27cf6969..d009f7a1 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java
@@ -2,12 +2,22 @@
import com.github.fridujo.rabbitmq.mock.metrics.MetricsCollectorWrapper;
import com.rabbitmq.client.AddressResolver;
-import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.concurrent.ExecutorService;
+import static java.lang.String.format;
+
public class MockConnectionFactory extends ConfigurableConnectionFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MockConnectionFactory.class);
+
+ private HashMap policies = new HashMap<>();
+
public MockConnectionFactory() {
setAutomaticRecoveryEnabled(false);
}
@@ -22,4 +32,22 @@ public MockConnection newConnection() {
MockConnection mockConnection = new MockConnection(mockNode, metricsCollectorWrapper);
return mockConnection;
}
+
+ public void setPolicy(MockPolicy policy) {
+ policies.put(policy.getName(), policy);
+ mockNode.applyPolicies(new HashSet(policies.values()));
+ }
+
+ public void deletePolicy(String policyName) {
+ if(policies.remove(policyName) == null) {
+ LOGGER.error(format("Error deleting, policy with name %s was not found", policyName));
+ } else {
+ mockNode.applyPolicies(new HashSet(policies.values()));
+ }
+ }
+
+ public Collection listPolicies() {
+ return policies.values();
+ }
+
}
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java
index fce675de..3b3921fc 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java
@@ -1,8 +1,11 @@
package com.github.fridujo.rabbitmq.mock;
+import java.util.Collection;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.function.Supplier;
import com.rabbitmq.client.AMQP;
@@ -189,4 +192,20 @@ public void close(MockConnection mockConnection) {
public Configuration getConfiguration() {
return configuration;
}
+
+ public void applyPolicies(Set policies) {
+ applyPolicyToReceivers(policies, exchanges.values());
+ applyPolicyToReceivers(policies, queues.values());
+ }
+
+ private void applyPolicyToReceivers(Set policies, Collection receivers) {
+ Function> calculateHighestPriorityPolicy = r -> policies.stream()
+ .sorted(MockPolicy.comparator)
+ .filter(p -> p.receiverMatchesPolicyPattern.test(r))
+ .findFirst();
+
+ receivers.stream()
+ .filter(r -> !MockDefaultExchange.class.isInstance(r))
+ .forEach(r -> r.setPolicy(calculateHighestPriorityPolicy.apply(r)));
+ }
}
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java
new file mode 100644
index 00000000..d80eb633
--- /dev/null
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java
@@ -0,0 +1,77 @@
+package com.github.fridujo.rabbitmq.mock;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Singular;
+import lombok.ToString;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.ALL;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
+import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+@Getter
+@ToString
+public class MockPolicy {
+ public static final String ALTERNATE_EXCHANGE = "alternate-exchange";
+ public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";
+ public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";
+
+ private String name;
+ private String pattern;
+ private Integer priority;
+ private Map definitions;
+ private ApplyTo applyTo;
+
+ static final Comparator comparator = Comparator.comparing(MockPolicy::getPriority).reversed();
+
+ final Predicate receiverMatchesPolicyPattern =
+ r -> r.pointer().name.matches(this.pattern) && this.applyTo.matches(r) ;
+
+ @Builder(toBuilder = true)
+ public MockPolicy(@NonNull String name, @NonNull String pattern, @NonNull @Singular Map definitions,
+ Integer priority, ApplyTo applyTo) {
+ this.name = name;
+ this.pattern = pattern;
+ this.definitions = definitions;
+ this.priority = priority == null ? 0 : priority;
+ this.applyTo = applyTo == null ? ALL : applyTo;
+ }
+
+ public Optional getAlternateExchange() {
+ return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE, definitions);
+ }
+
+ public Optional getDeadLetterExchange() {
+ return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE, definitions);
+ }
+
+ public Optional getDeadLetterRoutingKey() {
+ return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY, definitions);
+ }
+
+ public enum ApplyTo {
+ ALL(asList(ReceiverPointer.Type.QUEUE, ReceiverPointer.Type.EXCHANGE)),
+ EXCHANGE(singletonList(ReceiverPointer.Type.EXCHANGE)),
+ QUEUE(singletonList(ReceiverPointer.Type.QUEUE));
+
+ private List supportedTypes;
+
+ ApplyTo(List supportTypes) {
+ this.supportedTypes = supportTypes;
+ }
+
+ public boolean matches(Receiver r) {
+ return supportedTypes.contains(r.pointer().type);
+ }
+ }
+}
+
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java b/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java
index acb7f035..5312a5e3 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java
@@ -2,6 +2,7 @@
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndEatExceptions;
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndTransformExceptions;
+import static java.lang.String.format;
import java.io.IOException;
import java.time.Instant;
@@ -50,6 +51,7 @@ public class MockQueue implements Receiver {
private final Map unackedMessagesByDeliveryTag = new LinkedHashMap<>();
private final AtomicBoolean running = new AtomicBoolean(true);
private final Map> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<>();
+ private Optional mockPolicy = Optional.empty();
public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
this.name = name;
@@ -377,7 +379,15 @@ public String toString() {
}
private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) {
+
+ String routingKey = arguments.getDeadLetterRoutingKey()
+ .map(Optional::of)
+ .orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterRoutingKey))
+ .orElse(message.routingKey);
+
arguments.getDeadLetterExchange()
+ .map(Optional::of)
+ .orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterExchange))
.flatMap(receiverRegistry::getReceiver)
.ifPresent(deadLetterExchange -> {
LOGGER.debug(localized("dead-lettered to " + deadLetterExchange + ": " + message));
@@ -385,7 +395,7 @@ private void deadLetterWithReason(Message message, DeadLettering.ReasonType reas
BasicProperties props = event.prependOn(message.props);
deadLetterExchange.publish(
message.exchangeName,
- arguments.getDeadLetterRoutingKey().orElse(message.routingKey),
+ routingKey,
props,
message.body);
}
@@ -400,6 +410,11 @@ public List getUnackedMessages() {
return new ArrayList<>(unackedMessagesByDeliveryTag.values());
}
+ public void setPolicy(Optional mockPolicy) {
+ mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
+ this.mockPolicy = mockPolicy;
+ }
+
static class ConsumerAndTag {
private final String tag;
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java b/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java
index a6423fc9..f32b157c 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java
@@ -2,6 +2,8 @@
import com.rabbitmq.client.AMQP;
+import java.util.Optional;
+
/**
* Leverage the receiving capability of both Queues and Exchanges.
*/
@@ -14,4 +16,6 @@ public interface Receiver {
boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body);
ReceiverPointer pointer();
+
+ void setPolicy(Optional policy);
}
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java
index 8810fc70..f0228338 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java
@@ -1,5 +1,14 @@
package com.github.fridujo.rabbitmq.mock.exchange;
+import com.github.fridujo.rabbitmq.mock.AmqArguments;
+import com.github.fridujo.rabbitmq.mock.MockPolicy;
+import com.github.fridujo.rabbitmq.mock.Receiver;
+import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
+import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
+import com.rabbitmq.client.AMQP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -9,18 +18,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.fridujo.rabbitmq.mock.AmqArguments;
-import com.github.fridujo.rabbitmq.mock.MockQueue;
-import com.github.fridujo.rabbitmq.mock.Receiver;
-import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
-import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
-import com.rabbitmq.client.AMQP;
+import static java.lang.String.format;
public abstract class BindableMockExchange implements MockExchange {
- private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(BindableMockExchange.class);
protected final Set bindConfigurations = new LinkedHashSet<>();
private final String name;
@@ -28,6 +29,7 @@ public abstract class BindableMockExchange implements MockExchange {
private final AmqArguments arguments;
private final ReceiverPointer pointer;
private final ReceiverRegistry receiverRegistry;
+ private Optional mockPolicy = Optional.empty();
protected BindableMockExchange(String name, String type, AmqArguments arguments, ReceiverRegistry receiverRegistry) {
this.name = name;
@@ -66,7 +68,12 @@ public boolean publish(String previousExchangeName, String routingKey, AMQP.Basi
}
private Optional getAlternateExchange() {
- return arguments.getAlternateExchange().flatMap(receiverRegistry::getReceiver);
+ Optional policyAlternativeExchange = mockPolicy.flatMap(MockPolicy::getAlternateExchange);
+ Optional exchangeArgumentAlternativeExchange = arguments.getAlternateExchange();
+
+ return exchangeArgumentAlternativeExchange.map(Optional::of)
+ .orElse(policyAlternativeExchange)
+ .flatMap(receiverRegistry::getReceiver);
}
protected abstract Stream matchingReceivers(String routingKey, AMQP.BasicProperties props);
@@ -85,6 +92,12 @@ public void unbind(ReceiverPointer receiver, String routingKey) {
bindConfigurations.remove(new BindConfiguration(routingKey, receiver, Collections.emptyMap()));
}
+ @Override
+ public void setPolicy(Optional mockPolicy) {
+ mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
+ this.mockPolicy = mockPolicy;
+ }
+
@Override
public ReceiverPointer pointer() {
return pointer;
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java
index 5dacb77b..49598fc6 100644
--- a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/MockDefaultExchange.java
@@ -1,7 +1,9 @@
package com.github.fridujo.rabbitmq.mock.exchange;
import java.util.Map;
+import java.util.Optional;
+import com.github.fridujo.rabbitmq.mock.MockPolicy;
import com.rabbitmq.client.AMQP;
import com.github.fridujo.rabbitmq.mock.MockNode;
@@ -38,6 +40,11 @@ public void unbind(ReceiverPointer pointer, String routingKey) {
// nothing needed
}
+ @Override
+ public void setPolicy(Optional mockPolicy) {
+ throw new IllegalStateException("No policy should be applied for the default exchange");
+ }
+
@Override
public ReceiverPointer pointer() {
throw new IllegalStateException("No ReceiverPointer (internal use) should be needed for the default exchange");
diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java b/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java
new file mode 100644
index 00000000..82261195
--- /dev/null
+++ b/src/main/java/com/github/fridujo/rabbitmq/mock/tool/ParameterMarshaller.java
@@ -0,0 +1,45 @@
+package com.github.fridujo.rabbitmq.mock.tool;
+
+import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+public class ParameterMarshaller {
+
+ public static BiFunction, Optional> getParameterAsString =
+ (k, m) -> Optional.ofNullable(m.get(k))
+ .filter(aeObject -> aeObject instanceof String)
+ .map(String.class::cast);
+
+ public static BiFunction, Optional> getParameterAsPositiveInteger =
+ (k, m) -> Optional.ofNullable(m.get(k))
+ .filter(aeObject -> aeObject instanceof Number)
+ .map(Number.class::cast)
+ .map(num -> num.intValue())
+ .filter(i -> i > 0);
+
+ public static BiFunction, Optional> getParameterAsPositiveShort =
+ (k, m) -> Optional.ofNullable(m.get(k))
+ .filter(aeObject -> aeObject instanceof Number)
+ .map(Number.class::cast)
+ .map(num -> num.intValue())
+ .filter(i -> i > 0)
+ .filter(i -> i < 256)
+ .map(Integer::shortValue);
+
+ public static BiFunction, Optional> getParameterAsPositiveLong =
+ (k, m) -> Optional.ofNullable(m.get(k))
+ .filter(aeObject -> aeObject instanceof Number)
+ .map(Number.class::cast)
+ .map(number -> number.longValue());
+
+ public static BiFunction, Optional> getParameterAsExchangePointer =
+ (k, m) -> Optional.ofNullable(m.get(k))
+ .filter(aeObject -> aeObject instanceof String)
+ .map(String.class::cast)
+ .map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
+
+}
+
diff --git a/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java b/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java
new file mode 100644
index 00000000..e53ba472
--- /dev/null
+++ b/src/test/java/com/github/fridujo/rabbitmq/mock/MockPolicyUseCaseTests.java
@@ -0,0 +1,584 @@
+package com.github.fridujo.rabbitmq.mock;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.EXCHANGE;
+import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.QUEUE;
+import static java.lang.Thread.sleep;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class MockPolicyUseCaseTests {
+
+ public static final long TIMEOUT_INTERVAL = 200L;
+ public static final long RETRY_INTERVAL = 10L;
+
+ @Test
+ void canApplyDeadLetterExchangeAsPolicy() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "", null);
+ channel.queueDeclare("data", true, false, false, null);
+
+ MockPolicy deadLetterExchangePolicy = MockPolicy.builder()
+ .name("dead letter exchange policy")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex")
+ .build();
+
+ connectionFactory.setPolicy(deadLetterExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureRejectOnQueue(channel, "data");
+
+ configureMessageCapture(channel, "rejected", messages);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void applyingDeadExchangeAsPolicyDoesNotSupersedeQueueArgument() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "", null);
+
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "", null);
+
+ channel.queueDeclare("data", true, false, false, new HashMap() {{ put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex"); }});
+
+ MockPolicy deadLetterExchangePolicy = MockPolicy.builder()
+ .name("dead letter exchange policy that does not override queue arguments")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex-policy")
+ .build();
+
+ connectionFactory.setPolicy(deadLetterExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureRejectOnQueue(channel, "data");
+
+ configureMessageCapture(channel, "rejected", messages);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void canApplyDeadLetterRoutingKeyAsPolicy() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.DIRECT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "routed-by-policy", null);
+ channel.queueDeclare("data", true, false, false, new HashMap() {{ put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex"); }});
+
+ MockPolicy deadLetterRoutingKeyPolicy = MockPolicy.builder()
+ .name("dead letter routing key policy")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_ROUTING_KEY, "routed-by-policy")
+ .build();
+
+ connectionFactory.setPolicy(deadLetterRoutingKeyPolicy);
+
+ List messages = new ArrayList<>();
+
+ configureRejectOnQueue(channel, "data");
+
+ configureMessageCapture(channel, "rejected", messages);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void applyingDeadLetterRoutingKeyAsPolicyDoesNotSupersedeQueueArguments() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.DIRECT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "routed-by-queue-args", null);
+ channel.queueDeclare("data", true, false, false,
+ new HashMap() {{
+ put(AmqArguments.DEAD_LETTER_EXCHANGE_KEY, "rejected-ex");
+ put(AmqArguments.DEAD_LETTER_ROUTING_KEY_KEY, "routed-by-queue-args");
+ }});
+
+ MockPolicy deadLetterRoutingKeyPolicy = MockPolicy.builder()
+ .name("dead letter routing key policy that does not override queue arguments")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_ROUTING_KEY, "routed-by-policy")
+ .build();
+
+ connectionFactory.setPolicy(deadLetterRoutingKeyPolicy);
+
+ List messages = new ArrayList<>();
+
+ configureRejectOnQueue(channel, "data");
+
+ configureMessageCapture(channel, "rejected", messages);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void canApplyAlternativeExchangeAsPolicy() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void applyingAlternativeExchangeAsPolicyDoesNotSupersedeExchangeArguments() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT, true, false,
+ new HashMap() {{
+ put(AmqArguments.ALTERNATE_EXCHANGE_KEY, "alternative-ex");
+ }});
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex-policy")
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void doesNotApplyPolicyWhenPatternDoesNotMatch() throws IOException, TimeoutException, InterruptedException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern("does-not-match-non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ sleep(TIMEOUT_INTERVAL);
+
+ assertThat(messages).isEmpty();
+ }
+ }
+ }
+
+ @Test
+ void canUseRegExAsPatternToApplyPolicy() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern(".*routable-ex$")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void highestPriorityPolicyIsApplied() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("unroutable-alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy lowPriorityAlternativeExchangePolicy = MockPolicy.builder()
+ .name(" low priority alternative exchange policy")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "unroutable-alternative-ex")
+ .build();
+
+ MockPolicy highPriorityAlternativeExchangePolicy = MockPolicy.builder()
+ .name("high priority alternative exchange policy")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .priority(1)
+ .build();
+
+ connectionFactory.setPolicy(lowPriorityAlternativeExchangePolicy);
+ connectionFactory.setPolicy(highPriorityAlternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void appliesSinglePolicyWhenMultiplePatternsMatchWithSamePriority() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("unroutable-alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy policy = MockPolicy.builder()
+ .name("one")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .build();
+
+ connectionFactory.setPolicy(policy);
+ connectionFactory.setPolicy(policy.toBuilder().name("three").build());
+ connectionFactory.setPolicy(policy.toBuilder().name("two").build());
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messages);
+
+ assertThat(messages.get(0)).isEqualTo("bytes".getBytes());
+ }
+ }
+ }
+
+ @Test
+ void queuePolicyIsNotAppliedWhenAppliesToSetToExchanges() throws IOException, TimeoutException, InterruptedException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "", null);
+ channel.queueDeclare("data", true, false, false, null);
+
+ MockPolicy deadLetterExchangePolicy = MockPolicy.builder()
+ .name("dead letter exchange policy")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex")
+ .applyTo(EXCHANGE)
+ .build();
+
+ connectionFactory.setPolicy(deadLetterExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureRejectOnQueue(channel, "data");
+
+ configureMessageCapture(channel, "rejected", messages);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ sleep(TIMEOUT_INTERVAL);
+
+ assertThat(messages).isEmpty();
+ }
+ }
+ }
+
+ @Test
+ void cannotApplyPolicyToDefaultExchange() throws IOException, TimeoutException, InterruptedException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern("")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("", "non-routable-ex", null, "bytes".getBytes());
+
+ sleep(TIMEOUT_INTERVAL);
+
+ assertThat(messages).isEmpty();
+ }
+ }
+ }
+
+ @Test
+ void exchangePolicyIsNotAppliedWhenAppliesToSetToQueues() throws IOException, TimeoutException, InterruptedException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("non-routable-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("alternative-ex", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("alternative", true, false, false, null);
+ channel.queueBind("alternative", "alternative-ex", "", null);
+
+ MockPolicy alternativeExchangePolicy = MockPolicy.builder()
+ .name("alternative exchange policy")
+ .pattern("non-routable-ex")
+ .definition(MockPolicy.ALTERNATE_EXCHANGE, "alternative-ex")
+ .applyTo(QUEUE)
+ .build();
+
+ connectionFactory.setPolicy(alternativeExchangePolicy);
+
+ List messages = new ArrayList<>();
+
+ configureMessageCapture(channel, "alternative", messages);
+
+ channel.basicPublish("non-routable-ex", "", null, "bytes".getBytes());
+
+ sleep(TIMEOUT_INTERVAL);
+
+ assertThat(messages).isEmpty();
+ }
+ }
+ }
+
+ @Test
+ void policiesCanBeChangedDynamicallyWithSetAndDelete() throws IOException, TimeoutException {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ try (Connection conn = connectionFactory.newConnection()) {
+ try (Channel channel = conn.createChannel()) {
+ channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
+ channel.exchangeDeclare("rejected-ex-2", BuiltinExchangeType.FANOUT);
+ channel.queueDeclare("rejected", true, false, false, null);
+ channel.queueDeclare("rejected-2", true, false, false, null);
+ channel.queueBind("rejected", "rejected-ex", "", null);
+ channel.queueBind("rejected-2", "rejected-ex-2", "", null);
+ channel.queueDeclare("data", true, false, false, null);
+
+ List messagesFromPolicyA = new ArrayList<>();
+ List messagesFromPolicyB = new ArrayList<>();
+
+ MockPolicy policyA = MockPolicy.builder()
+ .name("dead letter exchange policy")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex")
+ .build();
+
+ connectionFactory.setPolicy(policyA);
+ assertThat(connectionFactory.listPolicies()).containsExactly(policyA);
+
+ configureRejectOnQueue(channel, "data");
+ configureMessageCapture(channel, "rejected", messagesFromPolicyA);
+ configureMessageCapture(channel, "rejected-2", messagesFromPolicyB);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messagesFromPolicyA);
+
+ assertThat(messagesFromPolicyA.get(0)).isEqualTo("bytes".getBytes());
+ assertThat(messagesFromPolicyB).isEmpty();
+
+ messagesFromPolicyA.clear();
+
+ MockPolicy policyB = MockPolicy.builder()
+ .name("higher priority dead letter exchange policy")
+ .pattern("data")
+ .definition(MockPolicy.DEAD_LETTER_EXCHANGE, "rejected-ex-2")
+ .priority(1)
+ .build();
+
+ connectionFactory.setPolicy(policyB);
+ assertThat(connectionFactory.listPolicies()).containsExactly(policyA, policyB);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messagesFromPolicyB);
+
+ assertThat(messagesFromPolicyB.get(0)).isEqualTo("bytes".getBytes());
+ assertThat(messagesFromPolicyA).isEmpty();
+
+ messagesFromPolicyB.clear();
+
+
+ MockPolicy higherPriorityPolicyA = policyA.toBuilder().priority(2).build();
+ connectionFactory.setPolicy(higherPriorityPolicyA);
+ assertThat(connectionFactory.listPolicies()).containsExactly(higherPriorityPolicyA, policyB);
+
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messagesFromPolicyA);
+
+ assertThat(messagesFromPolicyA.get(0)).isEqualTo("bytes".getBytes());
+ assertThat(messagesFromPolicyB).isEmpty();
+
+ messagesFromPolicyA.clear();
+
+ connectionFactory.deletePolicy(policyA.getName());
+ assertThat(connectionFactory.listPolicies()).containsExactly(policyB);
+
+ channel.basicPublish("", "data", null, "bytes".getBytes());
+
+ waitForMessageDelivered(messagesFromPolicyB);
+
+ assertThat(messagesFromPolicyB.get(0)).isEqualTo("bytes".getBytes());
+ assertThat(messagesFromPolicyA).isEmpty();
+ }
+ }
+ }
+
+ @Test
+ public void deletingPolicyThatDoesNotExistThrowsNoExceptions() {
+ MockConnectionFactory connectionFactory = new MockConnectionFactory();
+ connectionFactory.deletePolicy("Policy that doesn't exist");
+ }
+
+ private void configureRejectOnQueue(Channel channel, String queueName) throws IOException {
+ channel.basicConsume(queueName, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ channel.basicReject(envelope.getDeliveryTag(), false);
+ }
+ });
+ }
+
+ private void configureMessageCapture(Channel channel, String queueName, List messages) throws IOException {
+ channel.basicConsume(queueName, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+ messages.add(body);
+ }
+ });
+ }
+
+ private void waitForMessageDelivered(List messages) {
+ Assertions.assertTimeoutPreemptively(Duration.ofMillis(TIMEOUT_INTERVAL), () -> {
+ while(messages.isEmpty()) {
+ sleep(RETRY_INTERVAL);
+ }
+ });
+ }
+}
diff --git a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java
index 5fad1993..9aab02ea 100644
--- a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java
+++ b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ExchangeTest.java
@@ -5,6 +5,7 @@
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -74,6 +75,12 @@ void binding_key_does_not_match_routing_key(String bindingKey, String routingKey
assertThat(directExchange.match(bindConfiguration, routingKey, emptyMap())).isFalse();
}
+
+ @Test
+ void applying_policy_throws_exception() {
+ MockDefaultExchange mockDefaultExchange = new MockDefaultExchange(null);
+ assertThatThrownBy(() -> mockDefaultExchange.setPolicy(Optional.empty()), "No policy should be applied for the default exchange");
+ }
}
@Nested