Skip to content

Commit

Permalink
IGNITE-22962 Added conflict resolver metrics (#11469)
Browse files Browse the repository at this point in the history
  • Loading branch information
maksaska authored Aug 12, 2024
1 parent 984d9ae commit 1d9312a
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,6 @@ You should consider the nature of your transactions, the rate of change of your

Custom conflict resolver can be set via `conflictResolver` and allows to compare or merge the conflict data in any required way.

=== Conflict Resolver Metrics

The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics:

[cols="35%,65%",opts="header"]
|===
|Name |Description
| `AcceptedCount` | Count of accepted entries.
| `RejectedCount` | Count of rejected entries.
|===

These metrics are registered under `conflict-resolver` registry for each node configured with this plugin.

=== Configuration example
Configuration is done via Ignite node plugin:

Expand Down
3 changes: 3 additions & 0 deletions docs/_docs/monitoring-metrics/new-metrics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ Register name: `cache.{cache_name}.{near}`
|CacheSize|long|Local cache size.
|CommitTime |histogram | Commit time in nanoseconds.
|CommitTimeTotal |long| The total time of commit, in nanoseconds.
|ConflictResolverAcceptedCount|long|Conflict resolver accepted entries count.
|ConflictResolverRejectedCount|long|Conflict resolver rejected entries count.
|ConflictResolverMergedCount|long|Conflict resolver merged entries count.
|EntryProcessorHits | long|The total number of invocations on keys, which exist in cache.
|EntryProcessorInvokeTimeNanos | long | The total time of cache invocations for which this node is the initiator, in nanoseconds.
|EntryProcessorMaxInvocationTime |long | So far, the maximum time to execute cache invokes for which this node is the initiator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ public class CacheMetricsImpl implements CacheMetrics {
/** The number of local node partitions that remain to be processed to complete indexing. */
private final IntMetricImpl idxBuildPartitionsLeftCnt;

/** Cache metric registry. */
private final MetricRegistryImpl mreg;

/** Conflict resolver accepted entries count. */
private LongAdderMetric rslvrAcceptedCnt;

/** Conflict resolver rejected entries count. */
private LongAdderMetric rslvrRejectedCnt;

/** Conflict resolver merged entries count. */
private LongAdderMetric rslvrMergedCnt;

/**
* Creates cache metrics.
*
Expand Down Expand Up @@ -256,7 +268,7 @@ public CacheMetricsImpl(GridCacheContext<?, ?> cctx, boolean isNear) {

delegate = null;

MetricRegistryImpl mreg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), isNear));
mreg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), isNear));

reads = mreg.longMetric("CacheGets",
"The total number of gets to the cache.");
Expand Down Expand Up @@ -732,6 +744,15 @@ public void clear() {
txKeyCollisionInfo = null;

idxRebuildKeyProcessed.reset();

if (rslvrAcceptedCnt != null)
rslvrAcceptedCnt.reset();

if (rslvrRejectedCnt != null)
rslvrRejectedCnt.reset();

if (rslvrMergedCnt != null)
rslvrMergedCnt.reset();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1659,6 +1680,33 @@ public void resetIndexBuildPartitionsLeftCount() {
return idxBuildPartitionsLeftCnt.value();
}

/** */
public void incrementResolverAcceptedCount() {
rslvrAcceptedCnt.increment();
}

/** */
public void incrementResolverRejectedCount() {
rslvrRejectedCnt.increment();
}

/** */
public void incrementResolverMergedCount() {
rslvrMergedCnt.increment();
}

/** Registers metrics for conflict resolver. */
public void registerResolverMetrics() {
rslvrAcceptedCnt = mreg.longAdderMetric("ConflictResolverAcceptedCount",
"Conflict resolver accepted entries count");

rslvrRejectedCnt = mreg.longAdderMetric("ConflictResolverRejectedCount",
"Conflict resolver rejected entries count");

rslvrMergedCnt = mreg.longAdderMetric("ConflictResolverMergedCount",
"Conflict resolver merged entries count");
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsImpl.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ public IgniteUuid dynamicDeploymentId() {
*/
void initConflictResolver() {
conflictRslvr = rslvrMgr.conflictResolver();

if (conflictRslvr != null)
cache().metrics0().registerResolverMetrics();
}

/**
Expand Down Expand Up @@ -1641,6 +1644,13 @@ public GridCacheVersionConflictContext<K, V> conflictResolve(GridCacheVersionedE
GridCacheVersionConflictContext<K, V> ctx = conflictRslvr.resolve(cacheObjCtx, oldEntry, newEntry,
atomicVerComp);

if (ctx.isUseNew())
cache().metrics0().incrementResolverAcceptedCount();
else if (ctx.isUseOld())
cache().metrics0().incrementResolverRejectedCount();
else
cache().metrics0().incrementResolverMergedCount();

if (ctx.isManualResolve())
drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.metric;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;

/** Tests conflict resolver metrics per cache. */
public class CacheMetricsConflictResolverTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setPluginProviders(new AbstractTestPluginProvider() {
@Override public String name() {
return "ConflictResolverProvider";
}

@Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
if (cls != CacheConflictResolutionManager.class)
return null;

return (T)new DynamicResolutionManager<>();
}
});

return cfg;
}

/** */
@Test
public void testCacheConflictResolver() throws Exception {
IgniteEx ign = startGrid(0);

IgniteCache<Object, Object> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);

checkMetrics(0, 0, 0);

TestCacheVersionConflictResolver.plc = ResolvePolicy.USE_NEW;

cache.put(0, 0);

checkMetrics(1, 0, 0);

TestCacheVersionConflictResolver.plc = ResolvePolicy.USE_OLD;

cache.put(0, 0);

checkMetrics(1, 1, 0);

TestCacheVersionConflictResolver.plc = ResolvePolicy.MERGE;

cache.put(0, 0);

checkMetrics(1, 1, 1);
}

/** */
private void checkMetrics(int expAccepted, int expRejected, int expMerged) {
MetricRegistryImpl mreg = grid(0).context().metric().registry(cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false));

assertEquals(expAccepted, mreg.<LongMetric>findMetric("ConflictResolverAcceptedCount").value());
assertEquals(expRejected, mreg.<LongMetric>findMetric("ConflictResolverRejectedCount").value());
assertEquals(expMerged, mreg.<LongMetric>findMetric("ConflictResolverMergedCount").value());
}

/** */
private static class TestCacheVersionConflictResolver implements CacheVersionConflictResolver {
/** */
private static ResolvePolicy plc;

/** {@inheritDoc} */
@Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

if (plc == ResolvePolicy.USE_NEW)
res.useNew();
else if (plc == ResolvePolicy.USE_OLD)
res.useOld();
else
res.merge(
newEntry.value(ctx),
Math.max(oldEntry.ttl(), newEntry.ttl()),
Math.max(oldEntry.expireTime(), newEntry.expireTime())
);

return res;
}
}

/** */
private static class DynamicResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
implements CacheConflictResolutionManager<K, V> {
/** {@inheritDoc} */
@Override public CacheVersionConflictResolver conflictResolver() {
return new TestCacheVersionConflictResolver();
}
}

/** Policy for conflict resolver. */
private enum ResolvePolicy {
/** Use old. */
USE_OLD,

/** Use new. */
USE_NEW,

/** Merge. */
MERGE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import org.apache.ignite.internal.metric.CacheMetricsAddRemoveTest;
import org.apache.ignite.internal.metric.CacheMetricsConflictResolverTest;
import org.apache.ignite.internal.metric.CustomMetricsTest;
import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest;
import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest;
Expand Down Expand Up @@ -89,6 +90,7 @@ public static List<Class<?>> suite(Collection<Class> ignoredTests) {
GridTestUtils.addTestIfNeeded(suite, SystemViewComputeJobTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, SystemViewCacheExpiryPolicyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheMetricsAddRemoveTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheMetricsConflictResolverTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, JmxExporterSpiTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, LogExporterSpiTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ReadMetricsOnNodeStartupTest.class, ignoredTests);
Expand Down

0 comments on commit 1d9312a

Please sign in to comment.