diff --git a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc index 98d6cfc7f32fd..206f5fbb84231 100644 --- a/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc +++ b/docs/_docs/extensions-and-integrations/change-data-capture-extensions.adoc @@ -260,19 +260,6 @@ You should consider the nature of your transactions, the rate of change of your Custom conflict resolver can be set via `conflictResolver` and allows to compare or merge the conflict data in any required way. -=== Conflict Resolver Metrics - -The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics: - -[cols="35%,65%",opts="header"] -|=== -|Name |Description -| `AcceptedCount` | Count of accepted entries. -| `RejectedCount` | Count of rejected entries. -|=== - -These metrics are registered under `conflict-resolver` registry for each node configured with this plugin. - === Configuration example Configuration is done via Ignite node plugin: diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc b/docs/_docs/monitoring-metrics/new-metrics.adoc index 7bce59dcb0096..e723e93fa9fa6 100644 --- a/docs/_docs/monitoring-metrics/new-metrics.adoc +++ b/docs/_docs/monitoring-metrics/new-metrics.adoc @@ -68,6 +68,9 @@ Register name: `cache.{cache_name}.{near}` |CacheSize|long|Local cache size. |CommitTime |histogram | Commit time in nanoseconds. |CommitTimeTotal |long| The total time of commit, in nanoseconds. +|ConflictResolverAcceptedCount|long|Conflict resolver accepted entries count. +|ConflictResolverRejectedCount|long|Conflict resolver rejected entries count. +|ConflictResolverMergedCount|long|Conflict resolver merged entries count. |EntryProcessorHits | long|The total number of invocations on keys, which exist in cache. |EntryProcessorInvokeTimeNanos | long | The total time of cache invocations for which this node is the initiator, in nanoseconds. |EntryProcessorMaxInvocationTime |long | So far, the maximum time to execute cache invokes for which this node is the initiator. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 0f305d6375143..0289276499469 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -228,6 +228,18 @@ public class CacheMetricsImpl implements CacheMetrics { /** The number of local node partitions that remain to be processed to complete indexing. */ private final IntMetricImpl idxBuildPartitionsLeftCnt; + /** Cache metric registry. */ + private final MetricRegistryImpl mreg; + + /** Conflict resolver accepted entries count. */ + private LongAdderMetric rslvrAcceptedCnt; + + /** Conflict resolver rejected entries count. */ + private LongAdderMetric rslvrRejectedCnt; + + /** Conflict resolver merged entries count. */ + private LongAdderMetric rslvrMergedCnt; + /** * Creates cache metrics. * @@ -256,7 +268,7 @@ public CacheMetricsImpl(GridCacheContext cctx, boolean isNear) { delegate = null; - MetricRegistryImpl mreg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), isNear)); + mreg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), isNear)); reads = mreg.longMetric("CacheGets", "The total number of gets to the cache."); @@ -732,6 +744,15 @@ public void clear() { txKeyCollisionInfo = null; idxRebuildKeyProcessed.reset(); + + if (rslvrAcceptedCnt != null) + rslvrAcceptedCnt.reset(); + + if (rslvrRejectedCnt != null) + rslvrRejectedCnt.reset(); + + if (rslvrMergedCnt != null) + rslvrMergedCnt.reset(); } /** {@inheritDoc} */ @@ -1659,6 +1680,33 @@ public void resetIndexBuildPartitionsLeftCount() { return idxBuildPartitionsLeftCnt.value(); } + /** */ + public void incrementResolverAcceptedCount() { + rslvrAcceptedCnt.increment(); + } + + /** */ + public void incrementResolverRejectedCount() { + rslvrRejectedCnt.increment(); + } + + /** */ + public void incrementResolverMergedCount() { + rslvrMergedCnt.increment(); + } + + /** Registers metrics for conflict resolver. */ + public void registerResolverMetrics() { + rslvrAcceptedCnt = mreg.longAdderMetric("ConflictResolverAcceptedCount", + "Conflict resolver accepted entries count"); + + rslvrRejectedCnt = mreg.longAdderMetric("ConflictResolverRejectedCount", + "Conflict resolver rejected entries count"); + + rslvrMergedCnt = mreg.longAdderMetric("ConflictResolverMergedCount", + "Conflict resolver merged entries count"); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsImpl.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index fbf11bf85a185..dc5b8a7b2cf2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -487,6 +487,9 @@ public IgniteUuid dynamicDeploymentId() { */ void initConflictResolver() { conflictRslvr = rslvrMgr.conflictResolver(); + + if (conflictRslvr != null) + cache().metrics0().registerResolverMetrics(); } /** @@ -1641,6 +1644,13 @@ public GridCacheVersionConflictContext conflictResolve(GridCacheVersionedE GridCacheVersionConflictContext ctx = conflictRslvr.resolve(cacheObjCtx, oldEntry, newEntry, atomicVerComp); + if (ctx.isUseNew()) + cache().metrics0().incrementResolverAcceptedCount(); + else if (ctx.isUseOld()) + cache().metrics0().incrementResolverRejectedCount(); + else + cache().metrics0().incrementResolverMergedCount(); + if (ctx.isManualResolve()) drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/CacheMetricsConflictResolverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/CacheMetricsConflictResolverTest.java new file mode 100644 index 0000000000000..b2385935fcc26 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/CacheMetricsConflictResolverTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metric; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.spi.metric.LongMetric; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; + +/** Tests conflict resolver metrics per cache. */ +public class CacheMetricsConflictResolverTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public T createComponent(PluginContext ctx, Class cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return (T)new DynamicResolutionManager<>(); + } + }); + + return cfg; + } + + /** */ + @Test + public void testCacheConflictResolver() throws Exception { + IgniteEx ign = startGrid(0); + + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + checkMetrics(0, 0, 0); + + TestCacheVersionConflictResolver.plc = ResolvePolicy.USE_NEW; + + cache.put(0, 0); + + checkMetrics(1, 0, 0); + + TestCacheVersionConflictResolver.plc = ResolvePolicy.USE_OLD; + + cache.put(0, 0); + + checkMetrics(1, 1, 0); + + TestCacheVersionConflictResolver.plc = ResolvePolicy.MERGE; + + cache.put(0, 0); + + checkMetrics(1, 1, 1); + } + + /** */ + private void checkMetrics(int expAccepted, int expRejected, int expMerged) { + MetricRegistryImpl mreg = grid(0).context().metric().registry(cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)); + + assertEquals(expAccepted, mreg.findMetric("ConflictResolverAcceptedCount").value()); + assertEquals(expRejected, mreg.findMetric("ConflictResolverRejectedCount").value()); + assertEquals(expMerged, mreg.findMetric("ConflictResolverMergedCount").value()); + } + + /** */ + private static class TestCacheVersionConflictResolver implements CacheVersionConflictResolver { + /** */ + private static ResolvePolicy plc; + + /** {@inheritDoc} */ + @Override public GridCacheVersionConflictContext resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + if (plc == ResolvePolicy.USE_NEW) + res.useNew(); + else if (plc == ResolvePolicy.USE_OLD) + res.useOld(); + else + res.merge( + newEntry.value(ctx), + Math.max(oldEntry.ttl(), newEntry.ttl()), + Math.max(oldEntry.expireTime(), newEntry.expireTime()) + ); + + return res; + } + } + + /** */ + private static class DynamicResolutionManager extends GridCacheManagerAdapter + implements CacheConflictResolutionManager { + /** {@inheritDoc} */ + @Override public CacheVersionConflictResolver conflictResolver() { + return new TestCacheVersionConflictResolver(); + } + } + + /** Policy for conflict resolver. */ + private enum ResolvePolicy { + /** Use old. */ + USE_OLD, + + /** Use new. */ + USE_NEW, + + /** Merge. */ + MERGE + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java index 58ace1eff2924..b9d1e4fd6be0e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import org.apache.ignite.internal.metric.CacheMetricsAddRemoveTest; +import org.apache.ignite.internal.metric.CacheMetricsConflictResolverTest; import org.apache.ignite.internal.metric.CustomMetricsTest; import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest; import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest; @@ -89,6 +90,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, SystemViewComputeJobTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, SystemViewCacheExpiryPolicyTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheMetricsAddRemoveTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheMetricsConflictResolverTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, JmxExporterSpiTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, LogExporterSpiTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ReadMetricsOnNodeStartupTest.class, ignoredTests);