Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix PcpMmvWriter race condition #133

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 66 additions & 17 deletions dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -47,6 +48,7 @@
import javax.measure.Unit;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Monitor;
import io.pcp.parfait.dxm.PcpString.PcpStringStore;
import io.pcp.parfait.dxm.semantics.Semantics;
import io.pcp.parfait.dxm.types.AbstractTypeHandler;
Expand Down Expand Up @@ -115,6 +117,12 @@ public int getBitmask() {
return bitmask;
}
}

private enum State {
STOPPED,
STARTING,
STARTED
}

private static final Set<MmvFlag> DEFAULT_FLAGS = Collections.unmodifiableSet(EnumSet.of(
MmvFlag.MMV_FLAG_NOPREFIX, MmvFlag.MMV_FLAG_PROCESS));
Expand Down Expand Up @@ -156,7 +164,10 @@ public void putBytes(ByteBuffer buffer, String value) {
private final Map<Class<?>, TypeHandler<?>> typeHandlers = new ConcurrentHashMap<Class<?>, TypeHandler<?>>(
DefaultTypeHandlers.getDefaultMappings());
private final PcpStringStore stringStore = new PcpStringStore();
private volatile boolean started = false;
private volatile State state = State.STOPPED;
private final Monitor stateMonitor = new Monitor();
private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED);
private volatile Duration maxWaitStart = Duration.ofSeconds(10);
private volatile boolean usePerMetricLock = true;
private final Map<PcpValueInfo,ByteBuffer> perMetricByteBuffers = newConcurrentMap();
private final Object globalLock = new Object();
Expand Down Expand Up @@ -221,7 +232,6 @@ public PcpMmvWriter(ByteBufferFactory byteBufferFactory, IdentifierSourceSet ide
this.instanceDomainStore = mmvVersion.createInstanceDomainStore(identifierSources, stringStore);
this.mmvVersion = mmvVersion;
this.metricNameValidator = mmvVersion.createMetricNameValidator();

registerType(String.class, MMV_STRING_HANDLER);
}

Expand Down Expand Up @@ -295,7 +305,7 @@ public final <T> void addMetric(MetricName name, Semantics semantics, Unit<?> un
* io.pcp.parfait.pcp.types.TypeHandler)
*/
public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler) {
if (started) {
if (state != State.STOPPED) {
// Can't add any more metrics anyway; harmless
return;
}
Expand All @@ -307,9 +317,22 @@ public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler
* @see io.pcp.parfait.pcp.PcpWriter#updateMetric(java.lang.String, java.lang.Object)
*/
public final void updateMetric(MetricName name, Object value) {
if (!started) {
return;
// If another thread has called start() we need to wait until the writer has completely started before
// proceeding to update the metric value. This is to avoid a race condition where start() has already written
// the old metric value, but has not yet finished writing all of the values, when the metric is updated. The
// implementation here is a little complicated to avoid taking a lock on the happy paths.
if (state == State.STARTED) {
doUpdateMetric(name, value);
} else if (state == State.STARTING) {
if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) {
// Leave the monitor immediately because we only care about being notified about the state change
stateMonitor.leave();
doUpdateMetric(name, value);
}
}
}

private void doUpdateMetric(MetricName name, Object value) {
PcpValueInfo info = metricData.get(name);
if (info == null) {
throw new IllegalArgumentException("Metric " + name
Expand All @@ -323,20 +346,27 @@ public final void updateMetric(MetricName name, Object value) {
* @see io.pcp.parfait.pcp.PcpWriter#start()
*/
public final void start() throws IOException {
initialiseOffsets();
updateState(State.STARTING);

dataFileBuffer = byteBufferFactory.build(getBufferLength());
synchronized (globalLock) {
populateDataBuffer(dataFileBuffer, metricData.values());
preparePerMetricBufferSlices();
try {
initialiseOffsets();

dataFileBuffer = byteBufferFactory.build(getBufferLength());
synchronized (globalLock) {
populateDataBuffer(dataFileBuffer, metricData.values());
preparePerMetricBufferSlices();
}
} catch (IOException | RuntimeException e) {
updateState(State.STOPPED);
throw e;
}

started = true;
updateState(State.STARTED);
}

@Override
public void reset() {
started = false;
updateState(State.STOPPED);
metricData.clear();
perMetricByteBuffers.clear();
instanceDomainStore.clear();
Expand Down Expand Up @@ -384,14 +414,23 @@ public void setProcessIdentifier(int pid) {
}

public void setPerMetricLock(boolean usePerMetricLock) {
Preconditions.checkState(!started, "Cannot change use of perMetricLock when started");
Preconditions.checkState(state == State.STOPPED, "Cannot change use of perMetricLock when started");
this.usePerMetricLock = usePerMetricLock;
}

public void setFlags(Set<MmvFlag> flags) {
this.flags = EnumSet.copyOf(flags);
}

/**
* Sets the maximum amount of time to wait for the writer to start when attempting to update a metric.
*
* @param maxWaitStart the maximum amount of time to wait
*/
public void setMaxWaitStart(Duration maxWaitStart) {
this.maxWaitStart = Preconditions.checkNotNull(maxWaitStart, "maxWaitStart cannot be null");
}

private synchronized void addMetricInfo(MetricName name, Semantics semantics, Unit<?> unit,
Object initialValue, TypeHandler<?> pcpType) {
if (metricData.containsKey(name)) {
Expand Down Expand Up @@ -464,11 +503,12 @@ private void updateValue(PcpValueInfo info, Object value) {

private void writeValueWithLockPerMetric(PcpValueInfo info, Object value, TypeHandler rawHandler) {
ByteBuffer perMetricByteBuffer = perMetricByteBuffers.get(info);
synchronized (perMetricByteBuffer) {
perMetricByteBuffer.position(0);
rawHandler.putBytes(perMetricByteBuffer, value);
if (perMetricByteBuffer != null) {
synchronized (perMetricByteBuffer) {
perMetricByteBuffer.position(0);
rawHandler.putBytes(perMetricByteBuffer, value);
}
}

}

private void writeValueWithGlobalLock(PcpValueInfo info, Object value, TypeHandler rawHandler) {
Expand Down Expand Up @@ -655,6 +695,15 @@ private int getProcessIdentifier() {
return processIdentifier;
}

private void updateState(State newState) {
stateMonitor.enter();
try {
state = newState;
} finally {
stateMonitor.leave();
}
}

public static void main(String[] args) throws IOException {
PcpMmvWriter bridge;

Expand Down
102 changes: 102 additions & 0 deletions dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package io.pcp.parfait.dxm;

import io.pcp.parfait.dxm.semantics.Semantics;
import io.pcp.parfait.dxm.types.AbstractTypeHandler;
import io.pcp.parfait.dxm.types.MmvMetricType;
import org.hamcrest.Matcher;
import org.junit.BeforeClass;
import org.junit.Test;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import static io.pcp.parfait.dxm.IdentifierSourceSet.DEFAULT_SET;
import static io.pcp.parfait.dxm.MmvVersion.MMV_VERSION1;
Expand Down Expand Up @@ -109,6 +116,93 @@ public void resetShouldClearStrings() throws Exception {
assertStringsCount(pcpMmvWriterV2, 0);
}

@Test
public void metricUpdatesWhileResettingWriterShouldNotBeLost() throws Exception {
// The order the metrics are written is non-deterministic because they're pulled out of a hash map, so
// we must dynamically record their order.
List<String> order = new ArrayList<>();

pcpMmvWriterV1.reset();
pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
order.add("value1");
buffer.putInt(value == null ? 0 : value.intValue());
}
});
pcpMmvWriterV1.addMetric(MetricName.parse("value2"), Semantics.COUNTER, ONE, 2,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
order.add("value2");
buffer.putInt(value == null ? 0 : value.intValue());
}
});

pcpMmvWriterV1.start();

waitForReload();

assertMetric("mmv.value1", is("1.000"));
assertMetric("mmv.value2", is("2.000"));

pcpMmvWriterV1.reset();

// The idea here is that the 1st metric will be written immediately, but the 2nd will wait on the phaser to
// write. This gives us time to update the 1st metric value. The sleep is needed to ensure the start() method
// doesn't exit before updateMetric() is executed.
Phaser phaser = new Phaser(2);

pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
boolean isNotFirst = !"value1".equals(order.get(0));
if (isNotFirst) {
phaser.arriveAndAwaitAdvance();
}
buffer.putInt(value == null ? 0 : value.intValue());
if (isNotFirst) {
sleep(1_000);
}
}
});
pcpMmvWriterV1.addMetric(MetricName.parse("value2"), Semantics.COUNTER, ONE, 2,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
boolean isNotFirst = !"value2".equals(order.get(0));
if (isNotFirst) {
phaser.arriveAndAwaitAdvance();
}
buffer.putInt(value == null ? 0 : value.intValue());
if (isNotFirst) {
sleep(1_000);
}
}
});

CountDownLatch startDone = new CountDownLatch(1);

new Thread(() -> {
try {
pcpMmvWriterV1.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
startDone.countDown();
}
}).start();

// Will not continue till after the 1st metric has been written
phaser.arriveAndAwaitAdvance();

pcpMmvWriterV1.updateMetric(MetricName.parse(order.get(0)), 10);

startDone.await();

waitForReload();

assertMetric("mmv." + order.get(0), is("10.000"));
}

private void assertMetric(String metricName, Matcher<String> expectedValue) throws Exception {
String actual = pcpClient.getMetric(metricName);
assertThat(actual, expectedValue);
Expand All @@ -125,4 +219,12 @@ private void waitForReload() throws InterruptedException {
Thread.sleep(1000);
}

private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.pcp.parfait.ValueSemantics;

import java.io.IOException;
import java.time.Duration;
import java.util.EnumSet;

import javax.management.AttributeNotFoundException;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void start() {
writer = new PcpMmvWriter(name, IdentifierSourceSet.DEFAULT_SET);
writer.setClusterIdentifier(MonitoringViewProperties.getCluster());
writer.setFlags(EnumSet.of(PcpMmvWriter.MmvFlag.MMV_FLAG_PROCESS));
writer.setMaxWaitStart(Duration.ofMillis(MonitoringViewProperties.getWriterWait()));

DynamicMonitoringView view;
view = new DynamicMonitoringView(registry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.lang.management.ManagementFactory;
import java.util.Collections;

import io.pcp.parfait.DynamicMonitoringView;
import io.pcp.parfait.dxm.HashingIdentifierSource;
import io.pcp.parfait.dxm.IdentifierSource;

Expand All @@ -31,15 +30,18 @@ public class MonitoringViewProperties {
private static final String INTERVAL = "interval";
private static final String STARTUP = "startup";
private static final String CONNECT = "connect";
private static final String WRITER_WAIT = "writer.wait";

public static final String PARFAIT_NAME = PARFAIT + "." + NAME;
public static final String PARFAIT_CLUSTER = PARFAIT + "." + CLUSTER;
public static final String PARFAIT_INTERVAL = PARFAIT + "." + INTERVAL;
public static final String PARFAIT_STARTUP = PARFAIT + "." + STARTUP;
public static final String PARFAIT_CONNECT = PARFAIT + "." + CONNECT;
public static final String PARFAIT_WRITER_WAIT = PARFAIT + "." + WRITER_WAIT;

private static final String DEFAULT_INTERVAL = "1000"; // milliseconds
private static final String DEFAULT_CONNECT = "localhost:9875";
private static final String DEFAULT_WRITER_WAIT_MS = "10000";

public static String getCommandBasename(String command) {
// trim away arguments, produce a generally sanitized basename
Expand Down Expand Up @@ -139,6 +141,14 @@ public static String getDefaultConnection() {
return connect;
}

public static String getDefaultWriterWait() {
String writerWait = System.getProperty(PARFAIT_WRITER_WAIT);
if (writerWait == null || writerWait.isEmpty()) {
return DEFAULT_WRITER_WAIT_MS;
}
return writerWait;
}

public static void setupProperties() {
String name = getDefaultName(getParfaitName(), getDefaultCommand(), getRuntimeName());
System.setProperty(PARFAIT_NAME, name);
Expand All @@ -154,6 +164,9 @@ public static void setupProperties() {

String connect = getDefaultConnection();
System.setProperty(PARFAIT_CONNECT, connect);

String writerWait = getDefaultWriterWait();
System.setProperty(PARFAIT_WRITER_WAIT, writerWait);
}

//
Expand All @@ -174,4 +187,13 @@ public static Long getStartup() {
public static String getConnection() {
return System.getProperty(PARFAIT_CONNECT);
}

/**
* The maximum number of milliseconds to wait for PcpMmvWriter to start when attempting to update a metric.
*
* @return maximum number of milliseconds to wait
*/
public static long getWriterWait() {
return Long.parseLong(System.getProperty(PARFAIT_WRITER_WAIT));
}
}
Loading