Skip to content

Commit

Permalink
Add metrics support for Temporal Workflow (#4095)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekmjain authored Feb 5, 2025
1 parent 09a3e21 commit adef734
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 1 deletion.
2 changes: 2 additions & 0 deletions gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ dependencies {
}
compile externalDependency.tdigest
compile externalDependency."temporal-sdk"
compile externalDependency.micrometerCore
compile externalDependency.micrometerRegistry
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,14 @@ public interface GobblinTemporalConfigurationKeys {

String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds";
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;

/**
* Temporal metrics config properties
*/
String TEMPORAL_METRICS_PREFIX = PREFIX + "metrics.";
String TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT = TEMPORAL_METRICS_PREFIX + "otlp";
String TEMPORAL_METRICS_OTLP_HEADERS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers";
String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.tally.Scope;

import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
Expand All @@ -41,7 +43,10 @@
import javax.net.ssl.TrustManagerFactory;

import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
import org.apache.gobblin.util.ConfigUtils;


public class TemporalWorkflowClientFactory {
Expand Down Expand Up @@ -100,10 +105,19 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t
.ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES)
.build();

// Initialize metrics
int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
Scope metricsScope = new RootScopeBuilder()
.reporter(TemporalMetricsHelper.getStatsReporter(config))
.tags(TemporalMetricsHelper.getDimensions(config))
.reportEvery(com.uber.m3.util.Duration.ofSeconds(reportInterval));

WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
.setTarget(connectionUri)
.setEnableHttps(true)
.setSslContext(sslContext)
.setMetricsScope(metricsScope)
.build();
return WorkflowServiceStubs.newServiceStubs(options);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.workflows.metrics;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import com.uber.m3.tally.StatsReporter;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.registry.otlp.OtlpConfig;
import io.micrometer.registry.otlp.OtlpMeterRegistry;
import io.temporal.common.reporter.MicrometerClientStatsReporter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;


@Slf4j
public class TemporalMetricsHelper {

/**
* Retrieves a map of dimension names and their corresponding values from the provided config.
* The dimensions are defined as a comma-separated string in the config, and the method
* fetches the corresponding values for each dimension.
* A missing dimension in config will have empty string as value.
*
* @param config Config object
* @return a map where the key is the dimension name and the value is the corresponding value from the config
*/
public static Map<String, String> getDimensions(Config config) {
String dimensionsString = ConfigUtils.getString(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY, "");

// Split the string by "," and create a map by fetching values from config
return Arrays.stream(dimensionsString.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toMap(key -> key, key -> ConfigUtils.getString(config, key, ""), (l, r)-> r));
}

/**
* Creates and returns a {@link StatsReporter} instance configured with an {@link OtlpMeterRegistry}.
* This reporter can be used to report metrics via the OpenTelemetry Protocol (OTLP) to a metrics backend.
*
* @param config Config object
* @return a {@link StatsReporter} instance, configured with an OTLP meter registry and ready to report metrics.
*/
public static StatsReporter getStatsReporter(Config config) {
OtlpConfig otlpConfig = getOtlpConfig(config);
MeterRegistry meterRegistry = new OtlpMeterRegistry(otlpConfig, Clock.SYSTEM);
return new MicrometerClientStatsReporter(meterRegistry);
}

@VisibleForTesting
static OtlpConfig getOtlpConfig(Config config) {
return new OtlpConfig() {
@Override
public String get(@NotNull String key) {
return ConfigUtils.getString(config, key, null);
}

@NotNull
@Override
public String prefix() {
return GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT;
}

@NotNull
@Override
public Map<String, String> headers() {
String headers = get(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_HEADERS_KEY);
return parseHeaders(headers);
}

@NotNull
@Override
public Duration step() {
int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
return Duration.ofSeconds(reportInterval);
}
};
}

private static Map<String, String> parseHeaders(String headersString) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(headersString, HashMap.class);
} catch (Exception e) {
String errMsg = "Failed to parse headers: " + headersString;
log.error(errMsg, e);
throw new RuntimeException(errMsg);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.workflows.metrics;


import io.micrometer.registry.otlp.OtlpConfig;

import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Map;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;


/** Test {@link TemporalMetricsHelper} */
public class TemporalMetricsHelperTest {

private Config config;

@BeforeClass
public void setup() {
config = ConfigFactory.empty()
.withValue("prefix", ConfigValueFactory.fromAnyRef("gobblin.temporal.metrics.otlp"))
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers",
ConfigValueFactory.fromAnyRef("{\"abc\":\"123\", \"pqr\":\"456\"}"))
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".resourceAttributes",
ConfigValueFactory.fromAnyRef("service.name=gobblin-service"))
.withValue("dim1", ConfigValueFactory.fromAnyRef("val1"))
.withValue("dim2", ConfigValueFactory.fromAnyRef("val2"))
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions",
ConfigValueFactory.fromAnyRef("dim1,dim2,missingDimension"));
}

@Test
public void testGetDimensions() {
Map<String, String> dimensions = TemporalMetricsHelper.getDimensions(config);

Assert.assertNotNull(dimensions);
Assert.assertEquals(3, dimensions.size());
Assert.assertEquals("val1", dimensions.get("dim1"));
Assert.assertEquals("val2", dimensions.get("dim2"));
Assert.assertEquals("", dimensions.get("missingDimension"));
}

@Test
public void testGetDimensionsEmptyConfig() {
Map<String, String> dimensions = TemporalMetricsHelper.getDimensions(ConfigFactory.empty());

Assert.assertNotNull(dimensions);
Assert.assertEquals(0, dimensions.size());
}

@Test
public void testGetOtlpConfig() {
OtlpConfig otlpConfig = TemporalMetricsHelper.getOtlpConfig(config);

Map<String, String> headers = otlpConfig.headers();
Assert.assertNotNull(headers);
Assert.assertEquals(2, headers.size());
Assert.assertEquals("123", headers.get("abc"));
Assert.assertEquals("456", headers.get("pqr"));

Assert.assertEquals("gobblin-service", otlpConfig.resourceAttributes().get("service.name"));
}
}
4 changes: 3 additions & 1 deletion gradle/scripts/defaultBuildProperties.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
.register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus"))
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce dependencies version"))
.register(new BuildProperty("openTelemetryVersion", "1.29.0", "OpenTelemetry dependencies version"))
.register(new BuildProperty("openTelemetryVersion", "1.30.0", "OpenTelemetry dependencies version"))
.register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer dependencies version"))
task buildProperties(description: 'Lists main properties that can be used to customize the build') {
doLast {
BUILD_PROPERTIES.printHelp();
Expand Down Expand Up @@ -74,5 +75,6 @@ BUILD_PROPERTIES.ensureDefined('kafka1Version')
BUILD_PROPERTIES.ensureDefined('pegasusVersion')
BUILD_PROPERTIES.ensureDefined('salesforceVersion')
BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
BUILD_PROPERTIES.ensureDefined('micrometerVersion')

ext.buildProperties = BUILD_PROPERTIES
2 changes: 2 additions & 0 deletions gradle/scripts/dependencyDefinitions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ ext.externalDependency = [
"opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" + openTelemetryVersion,
"opentelemetryExporterOtlp": "io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
"opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" + openTelemetryVersion,
"micrometerCore": "io.micrometer:micrometer-core:" + micrometerVersion,
"micrometerRegistry": "io.micrometer:micrometer-registry-otlp:" + micrometerVersion,
"jsch": "com.jcraft:jsch:0.1.54",
"jdo2": "javax.jdo:jdo2-api:2.1",
"azkaban": "com.linkedin.azkaban:azkaban:2.5.0",
Expand Down

0 comments on commit adef734

Please sign in to comment.