Skip to content

Commit

Permalink
[AMQ-8463] New Feature: Advanced Message Flow statistics
Browse files Browse the repository at this point in the history
Co-authored-by: Christopher L. Shannon <[email protected]>
  • Loading branch information
mattrpav and cshannon committed Jan 22, 2025
1 parent b7ce58a commit 5962286
Show file tree
Hide file tree
Showing 29 changed files with 859 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import jakarta.jms.InvalidSelectorException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
Expand All @@ -52,6 +54,8 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.management.UnsampledStatistic;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
Expand Down Expand Up @@ -620,4 +624,63 @@ public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return destination.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public long getEnqueuedMessageBrokerInTime() {
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageBrokerInTime, 0L);
}

@Override
public String getEnqueuedMessageClientId() {
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageClientID, null);
}

@Override
public String getEnqueuedMessageId() {
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageID, null);
}

@Override
public long getEnqueuedMessageTimestamp() {
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageTimestamp, 0L);
}

@Override
public long getDequeuedMessageBrokerInTime() {
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerInTime, 0L);
}

@Override
public long getDequeuedMessageBrokerOutTime() {
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerOutTime, 0L);
}

@Override
public String getDequeuedMessageClientId() {
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageClientID, null);
}

@Override
public String getDequeuedMessageId() {
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageID, null);
}

@Override
public long getDequeuedMessageTimestamp() {
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageTimestamp, 0L);
}

private <T> T getMessageFlowStat(Function<MessageFlowStats, UnsampledStatistic<T>> f, T defVal) {
final var stats = destination.getDestinationStatistics().getMessageFlowStats();
return stats != null ? f.apply(stats).getValue() : defVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,37 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop

@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();

@MBeanInfo("Query Advanced Message Statistics flag")
boolean isAdvancedMessageStatisticsEnabled();

@MBeanInfo("Toggle Advanced Message Statistics flag")
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

@MBeanInfo("Broker in time (ms) of last enqueued message to the destination")
long getEnqueuedMessageBrokerInTime();

@MBeanInfo("ClientID of last enqueued message to the destination")
String getEnqueuedMessageClientId();

@MBeanInfo("MessageID of last enqueued message to the destination")
String getEnqueuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last enqueued message to the destination")
long getEnqueuedMessageTimestamp();

@MBeanInfo("Broker in time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerInTime();

@MBeanInfo("Broker out time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerOutTime();

@MBeanInfo("ClientID of last dequeued message to the destination")
String getDequeuedMessageClientId();

@MBeanInfo("MessageID of last dequeued message to the destination")
String getDequeuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last dequeued message to the destination")
long getDequeuedMessageTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public boolean isGcWithNetworkConsumers() {
@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
this.lastActiveTime = timeStamp;
}
}
Expand All @@ -836,7 +836,7 @@ public void markForGC(long timeStamp) {
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
result = true;
}
Expand Down Expand Up @@ -880,6 +880,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.destinationStatistics.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.destinationStatistics.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public abstract List<Subscription> getConsumers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,12 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);

// [AMQ-9437]
boolean isAdvancedNetworkStatisticsEnabled();

void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);

boolean isAdvancedMessageStatisticsEnabled();

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return next.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.broker.region;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.*;

/**
* The J2EE Statistics for the a Destination.
*
*
* The Statistics for a Destination.
*/
public class DestinationStatistics extends StatsImpl {

protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl forwards;
protected CountStatisticImpl consumers;
protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl duplicateFromStore;
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
protected CountStatisticImpl blockedSends;
protected TimeStatisticImpl blockedTime;
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Statistics are optionally enabled
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;
private final CountStatisticImpl enqueues;
private final CountStatisticImpl dequeues;
private final CountStatisticImpl forwards;
private final CountStatisticImpl consumers;
private final CountStatisticImpl producers;
private final CountStatisticImpl messages;
private final PollCountStatisticImpl messagesCached;
private final CountStatisticImpl dispatched;
private final CountStatisticImpl duplicateFromStore;
private final CountStatisticImpl inflight;
private final CountStatisticImpl expired;
private final TimeStatisticImpl processTime;
private final CountStatisticImpl blockedSends;
private final TimeStatisticImpl blockedTime;
private final SizeStatisticImpl messageSize;
private final CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Network Statistics
private final CountStatisticImpl networkEnqueues;
private final CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are disabled by default
private final AtomicReference<MessageFlowStatsImpl> messageFlowStats = new AtomicReference<>();

public DestinationStatistics() {

Expand All @@ -75,25 +77,6 @@ public DestinationStatistics() {

networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("duplicateFromStore", duplicateFromStore);
addStatistic("inflight", inflight);
addStatistic("expired", expired);
addStatistic("consumers", consumers);
addStatistic("producers", producers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
addStatistic("processTime", processTime);
addStatistic("blockedSends",blockedSends);
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);

addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -132,10 +115,6 @@ public CountStatisticImpl getMessages() {
return messages;
}

public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
}

public CountStatisticImpl getDispatched() {
return dispatched;
}
Expand Down Expand Up @@ -170,6 +149,10 @@ public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public MessageFlowStats getMessageFlowStats() {
return messageFlowStats.get();
}

public void reset() {
if (this.isDoReset()) {
super.reset();
Expand All @@ -186,6 +169,8 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
Optional.ofNullable(messageFlowStats.get())
.ifPresent(MessageFlowStatsImpl::reset);
}
}

Expand All @@ -208,9 +193,13 @@ public void setEnabled(boolean enabled) {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);

// [AMQ-9437] Advanced Statistics
// [AMQ-9437] Advanced Network Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
Optional.ofNullable(messageFlowStats.get())
.ifPresent(stats -> stats.setEnabled(enabled));
}

public void setParent(DestinationStatistics parent) {
Expand All @@ -233,6 +222,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
// [AMQ-9437] Advanced Message Statistics does not have a parent.
} else {
enqueues.setParent(null);
dispatched.setParent(null);
Expand All @@ -252,7 +242,25 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
// [AMQ-9437] Advanced Message Statistics does not have a parent.
}
}

// This is the only method that can mutate the messageFlowStats state
public synchronized void setAdvancedMessageStatisticsEnabled(boolean enabled) {
if(!enabled) {
this.messageFlowStats.set(null);
return;
}

if(this.messageFlowStats.get() == null) {
MessageFlowStatsImpl tmpMessageFlowStatsImpl = new MessageFlowStatsImpl();
tmpMessageFlowStatsImpl.setEnabled(true);
this.messageFlowStats.set(tmpMessageFlowStatsImpl);
}
}

public boolean isAdvancedMessageStatisticsEnabled() {
return this.messageFlowStats.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
Expand Down Expand Up @@ -374,6 +375,11 @@ public void afterRollback() throws Exception {
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}

final MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), node.getMessageId().toString(), node.getMessage().getTimestamp(), node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
}
}
}

Expand Down
Loading

0 comments on commit 5962286

Please sign in to comment.