Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14077 Add ProcessGroup Performance Metrics to Prometheus #9577

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,35 @@ public NiFiMetricsRegistry() {
.help("Provenance repository free space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));

nameToGaugeMap.put("PROCESSING_PERF_CPU_MILLIS", Gauge.build()
.name("nifi_processing_performance_cpu_duration")
.help("Estimated CPU time (in milliseconds) used by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));

nameToGaugeMap.put("PROCESSING_PERF_GC_MILLIS", Gauge.build()
.name("nifi_processing_performance_gc_duration")
.help("Estimated garbage collection time (in milliseconds) used by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));

nameToGaugeMap.put("PROCESSING_PERF_READ_MILLIS", Gauge.build()
.name("nifi_processing_performance_content_read_duration")
.help("Estimated content read time (in milliseconds) used by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));

nameToGaugeMap.put("PROCESSING_PERF_WRITE_MILLIS", Gauge.build()
.name("nifi_processing_performance_content_write_duration")
.help("Estimated content write time (in milliseconds) used by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));

nameToGaugeMap.put("PROCESSING_PERF_COMMIT_MILLIS", Gauge.build()
.name("nifi_processing_performance_session_commit_duration")
.help("Estimated session commit time (in milliseconds) used by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
Expand Down Expand Up @@ -80,6 +81,12 @@ public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetric
nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
instanceId, componentType, componentName, componentId, parentPGId);

addProcessingPerformanceMetrics(nifiMetricsRegistry, status.getProcessingPerformanceStatus(),
instanceId, componentType, componentName, componentId, parentPGId);

nifiMetricsRegistry.setDataPoint(status.getProcessingNanos(), "TOTAL_TASK_DURATION",
instanceId, componentType, componentName, componentId, parentPGId);

// Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(nifiMetricsRegistry, childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
Expand Down Expand Up @@ -129,6 +136,9 @@ public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetric
nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
instanceId, procComponentType, procComponentName, procComponentId, parentId);

addProcessingPerformanceMetrics(nifiMetricsRegistry, processorStatus.getProcessingPerformanceStatus(),
instanceId, procComponentType, procComponentName, procComponentId, parentId);

}

for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
Expand Down Expand Up @@ -508,4 +518,25 @@ private static void addStorageUsageMetric(final NiFiMetricsRegistry nifiMetricsR
nifiMetricsRegistry.setDataPoint(storageUsage.getUsedSpace(), usedSpaceLabel,
instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier());
}

private static void addProcessingPerformanceMetrics(final NiFiMetricsRegistry niFiMetricsRegistry, final ProcessingPerformanceStatus perfStatus, final String instanceId,
final String componentType, final String componentName, final String componentId, final String parentId) {
if (perfStatus != null) {
niFiMetricsRegistry.setDataPoint(perfStatus.getCpuDuration() / 1000.0, "PROCESSING_PERF_CPU_MILLIS",
instanceId, componentType, componentName, componentId, parentId, perfStatus.getIdentifier());

// Base metric already in milliseconds
niFiMetricsRegistry.setDataPoint(perfStatus.getGarbageCollectionDuration(), "PROCESSING_PERF_GC_MILLIS",
instanceId, componentType, componentName, componentId, parentId, perfStatus.getIdentifier());

niFiMetricsRegistry.setDataPoint(perfStatus.getContentReadDuration() / 1000.0, "PROCESSING_PERF_READ_MILLIS",
instanceId, componentType, componentName, componentId, parentId, perfStatus.getIdentifier());

niFiMetricsRegistry.setDataPoint(perfStatus.getContentWriteDuration() / 1000.0, "PROCESSING_PERF_WRITE_MILLIS",
instanceId, componentType, componentName, componentId, parentId, perfStatus.getIdentifier());

niFiMetricsRegistry.setDataPoint(perfStatus.getSessionCommitDuration() / 1000.0, "PROCESSING_PERF_COMMIT_MILLIS",
instanceId, componentType, componentName, componentId, parentId, perfStatus.getIdentifier());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,15 @@ public Response getBreadcrumbs(
public Response getFlowMetrics(
@Parameter(
description = "The producer for flow file metrics. Each producer may have its own output format.",
required = true
required = true,
schema = @Schema(allowableValues = {"prometheus", "json"})
)
@PathParam("producer") final String producer,
@Parameter(
description = "Set of included metrics registries"
description = "Set of included metrics registries. Duplicate the parameter to include multiple registries. " +
"All registries are included by default.",

schema = @Schema(allowableValues = {"NIFI", "JVM", "BULLETIN", "CONNECTION", "CLUSTER"})
esecules marked this conversation as resolved.
Show resolved Hide resolved
)
@QueryParam("includedRegistries") final Set<FlowMetricsRegistry> includedRegistries,
@Parameter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
Expand All @@ -40,6 +42,7 @@
import org.apache.nifi.web.api.dto.DifferenceDTO;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.request.FlowMetricsProducer;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
Expand All @@ -61,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -83,6 +87,7 @@ public class TestFlowResource {
private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
private static final String PROCESSING_STARTS_WITH_PATTERN = "^nifi_processing_.*";
private static final String THREAD_COUNT_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
private static final String THREAD_COUNT_OTHER_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
private static final String ROOT_FIELD_NAME = "beans";
Expand Down Expand Up @@ -114,7 +119,7 @@ public void testGetFlowMetricsProducerInvalid() {

@Test
public void testGetFlowMetricsPrometheus() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
final List<CollectorRegistry> registries = getCollectorRegistries(false);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null, null);
Expand All @@ -126,11 +131,29 @@ public void testGetFlowMetricsPrometheus() throws IOException {

assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not found");
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
assertEquals(0, Pattern.compile(PROCESSING_STARTS_WITH_PATTERN, Pattern.MULTILINE).matcher(output).results().count());
}

@Test
public void testGetFlowMetricsPrometheusWithPerformanceStatus() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries(true);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null, null);

assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());

final String output = getResponseOutput(response);

assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not found");
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
assertEquals(5, Pattern.compile(PROCESSING_STARTS_WITH_PATTERN, Pattern.MULTILINE).matcher(output).results().count());
}

@Test
public void testGetFlowMetricsPrometheusSampleName() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
final List<CollectorRegistry> registries = getCollectorRegistries(false);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null, null);
Expand All @@ -146,7 +169,7 @@ public void testGetFlowMetricsPrometheusSampleName() throws IOException {

@Test
public void testGetFlowMetricsPrometheusSampleNameStartsWithPattern() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
final List<CollectorRegistry> registries = getCollectorRegistries(false);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, null);
Expand All @@ -163,7 +186,7 @@ public void testGetFlowMetricsPrometheusSampleNameStartsWithPattern() throws IOE

@Test
public void testGetFlowMetricsPrometheusSampleLabelValue() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
final List<CollectorRegistry> registries = getCollectorRegistries(false);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE, null);
Expand All @@ -179,7 +202,7 @@ public void testGetFlowMetricsPrometheusSampleLabelValue() throws IOException {

@Test
public void testGetFlowMetricsPrometheusSampleNameAndSampleLabelValue() throws IOException {
final List<CollectorRegistry> registries = getCollectorRegistries();
final List<CollectorRegistry> registries = getCollectorRegistries(false);
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);

final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE, null);
Expand Down Expand Up @@ -392,6 +415,11 @@ public void testGetVersionDifferencesWithOffsetAndOnlyPartialResult() {
assertEquals(createDifference("Position Changed", "Position was changed"), differences.get(0));
}

@Test
public void testWithProcessorPerformanceStatus() {

}

private void setUpGetVersionDifference() {
doReturn(getDifferences()).when(serviceFacade).getVersionDifference(anyString(), any(FlowVersionLocation.class), any(FlowVersionLocation.class));
}
Expand Down Expand Up @@ -445,11 +473,51 @@ private String getResponseOutput(final Response response) throws IOException {
return new String(outputBytes, StandardCharsets.UTF_8);
}

private List<CollectorRegistry> getCollectorRegistries() {
private List<CollectorRegistry> getCollectorRegistries(boolean includeProcessorPerfStatus) {
final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
final CollectorRegistry jvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), LABEL_VALUE);
final CollectorRegistry otherJvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
final NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
final ProcessGroupStatus processGroupStatus;
if (includeProcessorPerfStatus) {
processGroupStatus = makeTestProcessGroupStatusWithPerformance();
} else {
processGroupStatus = makeTestProcessGroupStatus();
}
final CollectorRegistry nifiCollectionRegistry = PrometheusMetricsUtil.createNifiMetrics(niFiMetricsRegistry, processGroupStatus, "", "", "", "");
return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry, nifiCollectionRegistry);
}

private static @NotNull ProcessGroupStatus makeTestProcessGroupStatus() {
final ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
processGroupStatus.setId("1234");
processGroupStatus.setFlowFilesReceived(5);
processGroupStatus.setBytesReceived(10000);
processGroupStatus.setFlowFilesSent(10);
processGroupStatus.setBytesSent(20000);
processGroupStatus.setQueuedCount(100);
processGroupStatus.setQueuedContentSize(1024L);
processGroupStatus.setBytesRead(60000L);
processGroupStatus.setBytesWritten(80000L);
processGroupStatus.setActiveThreadCount(5);
processGroupStatus.setOutputContentSize(1000L);
processGroupStatus.setInputContentSize(1000L);
processGroupStatus.setInputCount(1);
processGroupStatus.setOutputCount(1);
return processGroupStatus;
}

private static @NotNull ProcessGroupStatus makeTestProcessGroupStatusWithPerformance() {
ProcessGroupStatus status = makeTestProcessGroupStatus();
ProcessingPerformanceStatus performanceStatus = new ProcessingPerformanceStatus();
performanceStatus.setContentReadDuration(1L);
performanceStatus.setContentWriteDuration(1L);
performanceStatus.setIdentifier("");
performanceStatus.setSessionCommitDuration(1L);
performanceStatus.setGarbageCollectionDuration(1L);
performanceStatus.setCpuDuration(1L);
status.setProcessingPerformanceStatus(performanceStatus);
return status;
}

private Map<String, List<Sample>> convertJsonResponseToMap(final Response response) throws IOException {
Expand Down
Loading