Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #9 - Dead Letter Exchange, Dead Letter Routing and Alternative Exchange Policies #131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not.

I have nothing but bad experiences with Lombok.
I know that some people may find it useful to write less code, however code that is written behave as expected (vs generation of all possible constructors for ex.).
At best it messes up source jar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to take it out, it shouldn't be packaged in the source jar though.

I've got to finish ttl and then I've done all the policies so I'll probably push it all back as one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't be packaged in the source jar though

That is not what I meant, rather that sources using Lombok are not valid raw Java.
When navigating sources of a third-party library, IDEs match / compile source code with no knowledge of Lombok (or other tools like it) generating stuff at compile time.

This ends up with broken navigation, (navigating to a constructor that does not exists for instance), line numbers not matching those in stacktraces and inaccurate breakpoints.

Copy link
Contributor

@ledoyen ledoyen Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably push it all back as one

If you can split into feature commits (1 commit = prod+test) to ease review I would be very grateful !

<version>1.18.8</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
44 changes: 14 additions & 30 deletions src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.github.fridujo.rabbitmq.mock;

import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveInteger;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveLong;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveShort;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
import static java.util.Collections.emptyMap;

import java.util.Arrays;
Expand All @@ -14,7 +19,7 @@ public class AmqArguments {
public static final String QUEUE_MAX_LENGTH_BYTES_KEY = "x-max-length-bytes";
public static final String OVERFLOW_KEY = "x-overflow";
public static final String MAX_PRIORITY_KEY = "x-max-priority";
private final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
public static final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
private final Map<String, Object> arguments;

public static AmqArguments empty() {
Expand All @@ -26,58 +31,37 @@ public AmqArguments(Map<String, Object> arguments) {
}

public Optional<ReceiverPointer> getAlternateExchange() {
return string(ALTERNATE_EXCHANGE_KEY)
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE_KEY, arguments);
}

public Optional<ReceiverPointer> getDeadLetterExchange() {
return string(DEAD_LETTER_EXCHANGE_KEY)
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE_KEY, arguments);
}

public Optional<String> getDeadLetterRoutingKey() {
return string(DEAD_LETTER_ROUTING_KEY_KEY);
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY_KEY, arguments);
}

public Optional<Integer> queueLengthLimit() {
return positiveInteger(QUEUE_MAX_LENGTH_KEY);
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_KEY, arguments);
}

public Optional<Integer> queueLengthBytesLimit() {
return positiveInteger(QUEUE_MAX_LENGTH_BYTES_KEY);
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_BYTES_KEY, arguments);
}

public Overflow overflow() {
return string(OVERFLOW_KEY)
return getParameterAsString.apply(OVERFLOW_KEY, arguments)
.flatMap(Overflow::parse)
.orElse(Overflow.DROP_HEAD);
}

public Optional<Long> getMessageTtlOfQueue() {
return Optional.ofNullable(arguments.get(MESSAGE_TTL_KEY))
.filter(aeObject -> aeObject instanceof Number)
.map(Number.class::cast)
.map(number -> number.longValue());
return getParameterAsPositiveLong.apply(MESSAGE_TTL_KEY, arguments);
}

public Optional<Short> queueMaxPriority() {
return positiveInteger(MAX_PRIORITY_KEY)
.filter(i -> i < 256)
.map(Integer::shortValue);
}

private Optional<Integer> positiveInteger(String key) {
return Optional.ofNullable(arguments.get(key))
.filter(aeObject -> aeObject instanceof Number)
.map(Number.class::cast)
.map(num -> num.intValue())
.filter(i -> i > 0);
}

private Optional<String> string(String key) {
return Optional.ofNullable(arguments.get(key))
.filter(aeObject -> aeObject instanceof String)
.map(String.class::cast);
return getParameterAsPositiveShort.apply(MAX_PRIORITY_KEY, arguments);
}

public enum Overflow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@

import com.github.fridujo.rabbitmq.mock.metrics.MetricsCollectorWrapper;
import com.rabbitmq.client.AddressResolver;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;

import static java.lang.String.format;

public class MockConnectionFactory extends ConfigurableConnectionFactory<MockConnectionFactory> {

private static final Logger LOGGER = LoggerFactory.getLogger(MockConnectionFactory.class);

private HashMap<String, MockPolicy> policies = new HashMap<>();

public MockConnectionFactory() {
setAutomaticRecoveryEnabled(false);
}
Expand All @@ -22,4 +32,22 @@ public MockConnection newConnection() {
MockConnection mockConnection = new MockConnection(mockNode, metricsCollectorWrapper);
return mockConnection;
}

public void setPolicy(MockPolicy policy) {
policies.put(policy.getName(), policy);
mockNode.applyPolicies(new HashSet(policies.values()));
}

public void deletePolicy(String policyName) {
if(policies.remove(policyName) == null) {
LOGGER.error(format("Error deleting, policy with name %s was not found", policyName));
} else {
mockNode.applyPolicies(new HashSet(policies.values()));
}
}

public Collection<MockPolicy> listPolicies() {
return policies.values();
}

}
19 changes: 19 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.github.fridujo.rabbitmq.mock;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -189,4 +192,20 @@ public void close(MockConnection mockConnection) {
public Configuration getConfiguration() {
return configuration;
}

public void applyPolicies(Set<MockPolicy> policies) {
applyPolicyToReceivers(policies, exchanges.values());
applyPolicyToReceivers(policies, queues.values());
}

private <T extends Receiver> void applyPolicyToReceivers(Set<MockPolicy> policies, Collection<T> receivers) {
Function<T, Optional<MockPolicy>> calculateHighestPriorityPolicy = r -> policies.stream()
.sorted(MockPolicy.comparator)
.filter(p -> p.receiverMatchesPolicyPattern.test(r))
.findFirst();

receivers.stream()
.filter(r -> !MockDefaultExchange.class.isInstance(r))
.forEach(r -> r.setPolicy(calculateHighestPriorityPolicy.apply(r)));
}
}
77 changes: 77 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.github.fridujo.rabbitmq.mock;

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Singular;
import lombok.ToString;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.ALL;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

@Getter
@ToString
public class MockPolicy {
public static final String ALTERNATE_EXCHANGE = "alternate-exchange";
public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";
public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";

private String name;
private String pattern;
private Integer priority;
private Map<String, Object> definitions;
private ApplyTo applyTo;

static final Comparator<MockPolicy> comparator = Comparator.comparing(MockPolicy::getPriority).reversed();

final Predicate<Receiver> receiverMatchesPolicyPattern =
r -> r.pointer().name.matches(this.pattern) && this.applyTo.matches(r) ;

@Builder(toBuilder = true)
public MockPolicy(@NonNull String name, @NonNull String pattern, @NonNull @Singular Map<String, Object> definitions,
Integer priority, ApplyTo applyTo) {
this.name = name;
this.pattern = pattern;
this.definitions = definitions;
this.priority = priority == null ? 0 : priority;
this.applyTo = applyTo == null ? ALL : applyTo;
}

public Optional<ReceiverPointer> getAlternateExchange() {
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE, definitions);
}

public Optional<ReceiverPointer> getDeadLetterExchange() {
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE, definitions);
}

public Optional<String> getDeadLetterRoutingKey() {
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY, definitions);
}

public enum ApplyTo {
ALL(asList(ReceiverPointer.Type.QUEUE, ReceiverPointer.Type.EXCHANGE)),
EXCHANGE(singletonList(ReceiverPointer.Type.EXCHANGE)),
QUEUE(singletonList(ReceiverPointer.Type.QUEUE));

private List<ReceiverPointer.Type> supportedTypes;

ApplyTo(List<ReceiverPointer.Type> supportTypes) {
this.supportedTypes = supportTypes;
}

public boolean matches(Receiver r) {
return supportedTypes.contains(r.pointer().type);
}
}
}

17 changes: 16 additions & 1 deletion src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndEatExceptions;
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndTransformExceptions;
import static java.lang.String.format;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class MockQueue implements Receiver {
private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap<>();
private final AtomicBoolean running = new AtomicBoolean(true);
private final Map<String, Set<Long>> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<>();
private Optional<MockPolicy> mockPolicy = Optional.empty();

public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
this.name = name;
Expand Down Expand Up @@ -377,15 +379,23 @@ public String toString() {
}

private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) {

String routingKey = arguments.getDeadLetterRoutingKey()
.map(Optional::of)
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterRoutingKey))
.orElse(message.routingKey);

arguments.getDeadLetterExchange()
.map(Optional::of)
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterExchange))
.flatMap(receiverRegistry::getReceiver)
.ifPresent(deadLetterExchange -> {
LOGGER.debug(localized("dead-lettered to " + deadLetterExchange + ": " + message));
DeadLettering.Event event = new DeadLettering.Event(name, reason, message, 1);
BasicProperties props = event.prependOn(message.props);
deadLetterExchange.publish(
message.exchangeName,
arguments.getDeadLetterRoutingKey().orElse(message.routingKey),
routingKey,
props,
message.body);
}
Expand All @@ -400,6 +410,11 @@ public List<Message> getUnackedMessages() {
return new ArrayList<>(unackedMessagesByDeliveryTag.values());
}

public void setPolicy(Optional<MockPolicy> mockPolicy) {
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
this.mockPolicy = mockPolicy;
}

static class ConsumerAndTag {

private final String tag;
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.rabbitmq.client.AMQP;

import java.util.Optional;

/**
* Leverage the receiving capability of both Queues and Exchanges.
*/
Expand All @@ -14,4 +16,6 @@ public interface Receiver {
boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body);

ReceiverPointer pointer();

void setPolicy(Optional<MockPolicy> policy);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package com.github.fridujo.rabbitmq.mock.exchange;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.MockPolicy;
import com.github.fridujo.rabbitmq.mock.Receiver;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.rabbitmq.client.AMQP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
Expand All @@ -9,25 +18,18 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.MockQueue;
import com.github.fridujo.rabbitmq.mock.Receiver;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.rabbitmq.client.AMQP;
import static java.lang.String.format;

public abstract class BindableMockExchange implements MockExchange {
private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BindableMockExchange.class);

protected final Set<BindConfiguration> bindConfigurations = new LinkedHashSet<>();
private final String name;
private final String type;
private final AmqArguments arguments;
private final ReceiverPointer pointer;
private final ReceiverRegistry receiverRegistry;
private Optional<MockPolicy> mockPolicy = Optional.empty();

protected BindableMockExchange(String name, String type, AmqArguments arguments, ReceiverRegistry receiverRegistry) {
this.name = name;
Expand Down Expand Up @@ -66,7 +68,12 @@ public boolean publish(String previousExchangeName, String routingKey, AMQP.Basi
}

private Optional<Receiver> getAlternateExchange() {
return arguments.getAlternateExchange().flatMap(receiverRegistry::getReceiver);
Optional<ReceiverPointer> policyAlternativeExchange = mockPolicy.flatMap(MockPolicy::getAlternateExchange);
Optional<ReceiverPointer> exchangeArgumentAlternativeExchange = arguments.getAlternateExchange();

return exchangeArgumentAlternativeExchange.map(Optional::of)
.orElse(policyAlternativeExchange)
.flatMap(receiverRegistry::getReceiver);
}

protected abstract Stream<ReceiverPointer> matchingReceivers(String routingKey, AMQP.BasicProperties props);
Expand All @@ -85,6 +92,12 @@ public void unbind(ReceiverPointer receiver, String routingKey) {
bindConfigurations.remove(new BindConfiguration(routingKey, receiver, Collections.emptyMap()));
}

@Override
public void setPolicy(Optional<MockPolicy> mockPolicy) {
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
this.mockPolicy = mockPolicy;
}

@Override
public ReceiverPointer pointer() {
return pointer;
Expand Down
Loading