Skip to content

Commit

Permalink
Issue #9 - Dead Letter Exchange, Dead Letter Routing and Alternative …
Browse files Browse the repository at this point in the history
…Exchange Policies
  • Loading branch information
RedMu committed Jul 7, 2020
1 parent 8930b81 commit bdc641f
Show file tree
Hide file tree
Showing 12 changed files with 832 additions and 43 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
44 changes: 14 additions & 30 deletions src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.github.fridujo.rabbitmq.mock;

import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveInteger;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveLong;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveShort;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
import static java.util.Collections.emptyMap;

import java.util.Arrays;
Expand All @@ -14,7 +19,7 @@ public class AmqArguments {
public static final String QUEUE_MAX_LENGTH_BYTES_KEY = "x-max-length-bytes";
public static final String OVERFLOW_KEY = "x-overflow";
public static final String MAX_PRIORITY_KEY = "x-max-priority";
private final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
public static final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
private final Map<String, Object> arguments;

public static AmqArguments empty() {
Expand All @@ -26,58 +31,37 @@ public AmqArguments(Map<String, Object> arguments) {
}

public Optional<ReceiverPointer> getAlternateExchange() {
return string(ALTERNATE_EXCHANGE_KEY)
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE_KEY, arguments);
}

public Optional<ReceiverPointer> getDeadLetterExchange() {
return string(DEAD_LETTER_EXCHANGE_KEY)
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE_KEY, arguments);
}

public Optional<String> getDeadLetterRoutingKey() {
return string(DEAD_LETTER_ROUTING_KEY_KEY);
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY_KEY, arguments);
}

public Optional<Integer> queueLengthLimit() {
return positiveInteger(QUEUE_MAX_LENGTH_KEY);
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_KEY, arguments);
}

public Optional<Integer> queueLengthBytesLimit() {
return positiveInteger(QUEUE_MAX_LENGTH_BYTES_KEY);
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_BYTES_KEY, arguments);
}

public Overflow overflow() {
return string(OVERFLOW_KEY)
return getParameterAsString.apply(OVERFLOW_KEY, arguments)
.flatMap(Overflow::parse)
.orElse(Overflow.DROP_HEAD);
}

public Optional<Long> getMessageTtlOfQueue() {
return Optional.ofNullable(arguments.get(MESSAGE_TTL_KEY))
.filter(aeObject -> aeObject instanceof Number)
.map(Number.class::cast)
.map(number -> number.longValue());
return getParameterAsPositiveLong.apply(MESSAGE_TTL_KEY, arguments);
}

public Optional<Short> queueMaxPriority() {
return positiveInteger(MAX_PRIORITY_KEY)
.filter(i -> i < 256)
.map(Integer::shortValue);
}

private Optional<Integer> positiveInteger(String key) {
return Optional.ofNullable(arguments.get(key))
.filter(aeObject -> aeObject instanceof Number)
.map(Number.class::cast)
.map(num -> num.intValue())
.filter(i -> i > 0);
}

private Optional<String> string(String key) {
return Optional.ofNullable(arguments.get(key))
.filter(aeObject -> aeObject instanceof String)
.map(String.class::cast);
return getParameterAsPositiveShort.apply(MAX_PRIORITY_KEY, arguments);
}

public enum Overflow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@

import com.github.fridujo.rabbitmq.mock.metrics.MetricsCollectorWrapper;
import com.rabbitmq.client.AddressResolver;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;

import static java.lang.String.format;

public class MockConnectionFactory extends ConfigurableConnectionFactory<MockConnectionFactory> {

private static final Logger LOGGER = LoggerFactory.getLogger(MockConnectionFactory.class);

private HashMap<String, MockPolicy> policies = new HashMap<>();

public MockConnectionFactory() {
setAutomaticRecoveryEnabled(false);
}
Expand All @@ -22,4 +32,22 @@ public MockConnection newConnection() {
MockConnection mockConnection = new MockConnection(mockNode, metricsCollectorWrapper);
return mockConnection;
}

public void setPolicy(MockPolicy policy) {
policies.put(policy.getName(), policy);
mockNode.applyPolicies(new HashSet(policies.values()));
}

public void deletePolicy(String policyName) {
if(policies.remove(policyName) == null) {
LOGGER.error(format("Error deleting, policy with name %s was not found", policyName));
} else {
mockNode.applyPolicies(new HashSet(policies.values()));
}
}

public Collection<MockPolicy> listPolicies() {
return policies.values();
}

}
19 changes: 19 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.github.fridujo.rabbitmq.mock;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -189,4 +192,20 @@ public void close(MockConnection mockConnection) {
public Configuration getConfiguration() {
return configuration;
}

public void applyPolicies(Set<MockPolicy> policies) {
applyPolicyToReceivers(policies, exchanges.values());
applyPolicyToReceivers(policies, queues.values());
}

private <T extends Receiver> void applyPolicyToReceivers(Set<MockPolicy> policies, Collection<T> receivers) {
Function<T, Optional<MockPolicy>> calculateHighestPriorityPolicy = r -> policies.stream()
.sorted(MockPolicy.comparator)
.filter(p -> p.receiverMatchesPolicyPattern.test(r))
.findFirst();

receivers.stream()
.filter(r -> !MockDefaultExchange.class.isInstance(r))
.forEach(r -> r.setPolicy(calculateHighestPriorityPolicy.apply(r)));
}
}
77 changes: 77 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.github.fridujo.rabbitmq.mock;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Singular;
import lombok.ToString;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.ALL;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

@Getter
@ToString
public class MockPolicy {
public static final String ALTERNATE_EXCHANGE = "alternate-exchange";
public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";
public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";

private String name;
private String pattern;
private Integer priority;
private Map<String, Object> definitions;
private ApplyTo applyTo;

static final Comparator<MockPolicy> comparator = Comparator.comparing(MockPolicy::getPriority).reversed();

final Predicate<Receiver> receiverMatchesPolicyPattern =
r -> r.pointer().name.matches(this.pattern) && this.applyTo.matches(r) ;

@Builder(toBuilder = true)
public MockPolicy(@NonNull String name, @NonNull String pattern, @NonNull @Singular Map<String, Object> definitions,
Integer priority, ApplyTo applyTo) {
this.name = name;
this.pattern = pattern;
this.definitions = definitions;
this.priority = priority == null ? 0 : priority;
this.applyTo = applyTo == null ? ALL : applyTo;
}

public Optional<ReceiverPointer> getAlternateExchange() {
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE, definitions);
}

public Optional<ReceiverPointer> getDeadLetterExchange() {
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE, definitions);
}

public Optional<String> getDeadLetterRoutingKey() {
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY, definitions);
}

public enum ApplyTo {
ALL(asList(ReceiverPointer.Type.QUEUE, ReceiverPointer.Type.EXCHANGE)),
EXCHANGE(singletonList(ReceiverPointer.Type.EXCHANGE)),
QUEUE(singletonList(ReceiverPointer.Type.QUEUE));

private List<ReceiverPointer.Type> supportedTypes;

ApplyTo(List<ReceiverPointer.Type> supportTypes) {
this.supportedTypes = supportTypes;
}

public boolean matches(Receiver r) {
return supportedTypes.contains(r.pointer().type);
}
}
}

17 changes: 16 additions & 1 deletion src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndEatExceptions;
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndTransformExceptions;
import static java.lang.String.format;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class MockQueue implements Receiver {
private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap<>();
private final AtomicBoolean running = new AtomicBoolean(true);
private final Map<String, Set<Long>> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<>();
private Optional<MockPolicy> mockPolicy = Optional.empty();

public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
this.name = name;
Expand Down Expand Up @@ -377,15 +379,23 @@ public String toString() {
}

private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) {

String routingKey = arguments.getDeadLetterRoutingKey()
.map(Optional::of)
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterRoutingKey))
.orElse(message.routingKey);

arguments.getDeadLetterExchange()
.map(Optional::of)
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterExchange))
.flatMap(receiverRegistry::getReceiver)
.ifPresent(deadLetterExchange -> {
LOGGER.debug(localized("dead-lettered to " + deadLetterExchange + ": " + message));
DeadLettering.Event event = new DeadLettering.Event(name, reason, message, 1);
BasicProperties props = event.prependOn(message.props);
deadLetterExchange.publish(
message.exchangeName,
arguments.getDeadLetterRoutingKey().orElse(message.routingKey),
routingKey,
props,
message.body);
}
Expand All @@ -400,6 +410,11 @@ public List<Message> getUnackedMessages() {
return new ArrayList<>(unackedMessagesByDeliveryTag.values());
}

public void setPolicy(Optional<MockPolicy> mockPolicy) {
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
this.mockPolicy = mockPolicy;
}

static class ConsumerAndTag {

private final String tag;
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.rabbitmq.client.AMQP;

import java.util.Optional;

/**
* Leverage the receiving capability of both Queues and Exchanges.
*/
Expand All @@ -14,4 +16,6 @@ public interface Receiver {
boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body);

ReceiverPointer pointer();

void setPolicy(Optional<MockPolicy> policy);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package com.github.fridujo.rabbitmq.mock.exchange;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.MockPolicy;
import com.github.fridujo.rabbitmq.mock.Receiver;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.rabbitmq.client.AMQP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
Expand All @@ -9,25 +18,18 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.MockQueue;
import com.github.fridujo.rabbitmq.mock.Receiver;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.rabbitmq.client.AMQP;
import static java.lang.String.format;

public abstract class BindableMockExchange implements MockExchange {
private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BindableMockExchange.class);

protected final Set<BindConfiguration> bindConfigurations = new LinkedHashSet<>();
private final String name;
private final String type;
private final AmqArguments arguments;
private final ReceiverPointer pointer;
private final ReceiverRegistry receiverRegistry;
private Optional<MockPolicy> mockPolicy = Optional.empty();

protected BindableMockExchange(String name, String type, AmqArguments arguments, ReceiverRegistry receiverRegistry) {
this.name = name;
Expand Down Expand Up @@ -66,7 +68,12 @@ public boolean publish(String previousExchangeName, String routingKey, AMQP.Basi
}

private Optional<Receiver> getAlternateExchange() {
return arguments.getAlternateExchange().flatMap(receiverRegistry::getReceiver);
Optional<ReceiverPointer> policyAlternativeExchange = mockPolicy.flatMap(MockPolicy::getAlternateExchange);
Optional<ReceiverPointer> exchangeArgumentAlternativeExchange = arguments.getAlternateExchange();

return exchangeArgumentAlternativeExchange.map(Optional::of)
.orElse(policyAlternativeExchange)
.flatMap(receiverRegistry::getReceiver);
}

protected abstract Stream<ReceiverPointer> matchingReceivers(String routingKey, AMQP.BasicProperties props);
Expand All @@ -85,6 +92,12 @@ public void unbind(ReceiverPointer receiver, String routingKey) {
bindConfigurations.remove(new BindConfiguration(routingKey, receiver, Collections.emptyMap()));
}

@Override
public void setPolicy(Optional<MockPolicy> mockPolicy) {
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
this.mockPolicy = mockPolicy;
}

@Override
public ReceiverPointer pointer() {
return pointer;
Expand Down
Loading

0 comments on commit bdc641f

Please sign in to comment.