Skip to content

Commit

Permalink
address Steven's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Jan 10, 2025
1 parent 285b2e4 commit ae2d885
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static boolean isNameSpaceSupported(MetricName metricName) {
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
* @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise
* returns an empty optional
* return an empty optional
*/
private static Optional<MetricValue> convertCounterToMetricValue(
MetricName metricName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,19 @@ public void testConvert_successfulyConvertGauges() {
MetricsToPerStepNamespaceMetricsConverter.convert(
step, counters, gauges, emptyHistograms, parsedMetricNames);

DataflowGaugeValue gauge_value1 = new DataflowGaugeValue();
gauge_value1.setValue(5L);
DataflowGaugeValue gaugeValue1 = new DataflowGaugeValue();
gaugeValue1.setValue(5L);

DataflowGaugeValue gauge_value2 = new DataflowGaugeValue();
gauge_value2.setValue(10L);
DataflowGaugeValue gaugeValue2 = new DataflowGaugeValue();
gaugeValue2.setValue(10L);

DataflowGaugeValue gauge_value3 = new DataflowGaugeValue();
gauge_value3.setValue(0L); // zero valued
DataflowGaugeValue gaugeValue3 = new DataflowGaugeValue();
gaugeValue3.setValue(0L); // zero valued

MetricValue expectedVal1 =
new MetricValue()
.setMetric("metric1")
.setValueGauge64(gauge_value1)
.setValueGauge64(gaugeValue1)
.setMetricLabels(new HashMap<>());

Map<String, String> val2LabelMap = new HashMap<>();
Expand All @@ -414,13 +414,13 @@ public void testConvert_successfulyConvertGauges() {
MetricValue expectedVal2 =
new MetricValue()
.setMetric("metric2")
.setValueGauge64(gauge_value2)
.setValueGauge64(gaugeValue2)
.setMetricLabels(val2LabelMap);

MetricValue expectedVal3 =
new MetricValue()
.setMetric("metric3")
.setValueGauge64(gauge_value3)
.setValueGauge64(gaugeValue3)
.setMetricLabels(new HashMap<>());

assertThat(conversionResult.size(), equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
*/
public class NoOpGauge implements Gauge {

private static final NoOpGauge singleton = new NoOpGauge();
private static final MetricName name = MetricName.named(NoOpGauge.class, "singleton");
private static final NoOpGauge SINGLETON_GAUGE = new NoOpGauge();
private static final MetricName NAME = MetricName.named(NoOpGauge.class, "singleton");

private NoOpGauge() {}

Expand All @@ -33,10 +33,10 @@ public void set(long n) {}

@Override
public MetricName getName() {
return name;
return NAME;
}

public static NoOpGauge getInstance() {
return singleton;
return SINGLETON_GAUGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.auto.value.AutoValue;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -75,20 +74,21 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class);

static HashMap<String, Histogram> latencyHistograms = new HashMap<String, Histogram>();
static ConcurrentHashMap<String, Histogram> latencyHistograms =
new ConcurrentHashMap<String, Histogram>();

abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();
abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();

static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>();

abstract HashMap<String, Long> perTopicPartitionBacklogs();
abstract ConcurrentHashMap<String, Long> perTopicPartitionBacklogs();

abstract AtomicBoolean isWritable();

public static KafkaMetricsImpl create() {
return new AutoValue_KafkaMetrics_KafkaMetricsImpl(
new HashMap<String, ConcurrentLinkedQueue<Duration>>(),
new HashMap<String, Long>(),
new ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>(),
new ConcurrentHashMap<String, Long>(),
new AtomicBoolean(true));
}

Expand All @@ -100,7 +100,7 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(elapsedTime);
perTopicRpcLatencies().put(topic, latencies);
perTopicRpcLatencies().putIfAbsent(topic, latencies);
} else {
latencies.add(elapsedTime);
}
Expand All @@ -109,10 +109,8 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {

/**
* @param topicName topicName
* @param partitionId partitionId for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @param backlog backlog for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @param partitionId partitionId
* @param backlog backlog for the specific partitionID of topicName
*/
@Override
public void updateBacklogBytes(String topicName, int partitionId, long backlog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class KafkaSinkMetrics {

// Base Metric names
private static final String RPC_LATENCY = "RpcLatency";
private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize";
private static final String ESTIMATED_BACKLOG_SIZE = "EstimatedBacklogSize";

// Kafka Consumer Method names
enum RpcMethod {
Expand All @@ -55,9 +55,9 @@ enum RpcMethod {
private static final String PARTITION_ID = "partition_id";

/**
* Creates an Histogram metric to record RPC latency. Metric will have name.
* Creates a {@link Histogram} metric to record RPC latency with the name
*
* <p>'RpcLatency*rpc_method:{method};topic_name:{topic};'
* <p>'RpcLatency*rpc_method:{method};topic_name:{topic};'.
*
* @param method Kafka method associated with this metric.
* @param topic Kafka topic associated with this metric.
Expand All @@ -76,9 +76,9 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic
}

/**
* Creates an Gauge metric to record per partition backlog. Metric will have name:
* Creates a {@link Gauge} metric to record per partition backlog with the name
*
* <p>'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};'
* <p>'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};'.
*
* @param topic Kafka topic associated with this metric.
* @param partitionId partition id associated with this metric.
Expand All @@ -89,9 +89,9 @@ public static Gauge createBacklogGauge(String topic, int partitionId) {
}

/**
* Creates an Gauge metric to record per partition backlog. Metric will have name:
* Creates a {@link Gauge} metric to record per partition backlog with the name
*
* <p>'name'
* <p>'name'.
*
* @param name MetricName for the KafkaSink.
* @return Counter.
Expand All @@ -111,7 +111,7 @@ public static Gauge createBacklogGauge(MetricName name) {
*/
public static MetricName getMetricGaugeName(String topic, int partitionId) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIAMTED_BACKLOG_SIZE);
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIMATED_BACKLOG_SIZE);
nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId));
nameBuilder.addLabel(TOPIC_LABEL, topic);
return nameBuilder.build(METRICS_NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public boolean advance() throws IOException {
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

// Pass metrics to container.
kafkaResults.updateKafkaMetrics();
return true;
} else { // -- (b)
Expand Down Expand Up @@ -308,6 +307,7 @@ public long getSplitBacklogBytes() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
backlogBytes += pBacklog;
}

return backlogBytes;
Expand Down

0 comments on commit ae2d885

Please sign in to comment.