diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b53302be89232..b382d06f6fb43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1969,8 +1969,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte return; final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); @@ -1986,6 +1987,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte if (statsEnabled) metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start); + + if (perfStatsEnabled) + writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start); } /** {@inheritDoc} */ @@ -1995,8 +1999,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte return new GridFinishedFuture(); final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); @@ -2013,6 +2018,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte if (statsEnabled) fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), start)); + if (perfStatsEnabled) + fut.listen(() -> writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start)); + return fut; } @@ -2838,9 +2846,10 @@ protected IgniteInternalFuture removeAsync0(final K key, @Nullable fina if (F.isEmpty(drMap)) return; - boolean statsEnabled = ctx.statisticsEnabled(); + final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); @@ -2856,6 +2865,9 @@ protected IgniteInternalFuture removeAsync0(final K key, @Nullable fina if (statsEnabled) metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() - start); + + if (perfStatsEnabled) + writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start); } /** {@inheritDoc} */ @@ -2865,8 +2877,9 @@ protected IgniteInternalFuture removeAsync0(final K key, @Nullable fina return new GridFinishedFuture(); final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - final long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); @@ -2883,6 +2896,9 @@ protected IgniteInternalFuture removeAsync0(final K key, @Nullable fina if (statsEnabled) fut.listen(new UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start)); + if (perfStatsEnabled) + fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start)); + return fut; } @@ -6165,7 +6181,7 @@ public InvokeAllTimeStatClosure(CacheMetricsImpl metrics, final long start) { * @param op Operation type. * @param start Start time in nanoseconds. */ - private void writeStatistics(OperationType op, long start) { + protected void writeStatistics(OperationType op, long start) { ctx.kernalContext().performanceStatistics().cacheOperation( op, ctx.cacheId(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 984b688956d6e..5bd4486b3b531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.processors.performancestatistics.OperationType; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -696,8 +697,9 @@ private IgniteInternalFuture> getAllAsyncInternal( /** {@inheritDoc} */ @Override public IgniteInternalFuture putAllConflictAsync(Map conflictMap) { final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); @@ -716,6 +718,9 @@ private IgniteInternalFuture> getAllAsyncInternal( if (statsEnabled) fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), start)); + if (perfStatsEnabled) + fut.listen(() -> writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start)); + return fut; } @@ -760,8 +765,9 @@ private IgniteInternalFuture> getAllAsyncInternal( /** {@inheritDoc} */ @Override public IgniteInternalFuture removeAllConflictAsync(Map conflictMap) { final boolean statsEnabled = ctx.statisticsEnabled(); + final boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); - final long start = statsEnabled ? System.nanoTime() : 0L; + final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L; ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); @@ -770,6 +776,9 @@ private IgniteInternalFuture> getAllAsyncInternal( if (statsEnabled) fut.listen(new UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start)); + if (perfStatsEnabled) + fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start)); + return fut; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java index df18195b6538d..3337f804a6165 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java @@ -91,12 +91,18 @@ public enum OperationType { QUERY_ROWS(20), /** Custom query property. */ - QUERY_PROPERTY(21); + QUERY_PROPERTY(21), + + /** Cache put all conflict. */ + CACHE_PUT_ALL_CONFLICT(22), + + /** Cache remove all conflict. */ + CACHE_REMOVE_ALL_CONFLICT(23); /** Cache operations. */ public static final EnumSet CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE, CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK, CACHE_GET_ALL, CACHE_PUT_ALL, - CACHE_REMOVE_ALL, CACHE_INVOKE_ALL); + CACHE_REMOVE_ALL, CACHE_INVOKE_ALL, CACHE_PUT_ALL_CONFLICT, CACHE_REMOVE_ALL_CONFLICT); /** Transaction operations. */ public static final EnumSet TX_OPS = EnumSet.of(TX_COMMIT, TX_ROLLBACK); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java index 769c1776a227b..7e388ec32a42a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java @@ -19,12 +19,16 @@ import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.ClientCacheConfiguration; import org.apache.ignite.client.ClientTransaction; import org.apache.ignite.client.Config; import org.apache.ignite.client.IgniteClient; @@ -33,26 +37,37 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.ThinClientConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.client.thin.TcpClientCache; import org.apache.ignite.internal.client.thin.TestTask; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL_CONFLICT; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL_CONFLICT; +import static org.junit.Assume.assumeTrue; /** * Tests thin client performance statistics. */ +@RunWith(Parameterized.class) public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStatisticsTest { /** Test task name. */ public static final String TEST_TASK_NAME = "TestTask"; @@ -66,12 +81,20 @@ public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStat /** Thin client. */ private static IgniteClient thinClient; + /** */ + @Parameterized.Parameter + public CacheAtomicityMode atomicityMode; + + /** */ + @Parameterized.Parameters(name = "atomicity={0}") + public static Collection parameters() { + return EnumSet.of(ATOMIC, TRANSACTIONAL); + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setCacheConfiguration(defaultCacheConfiguration()); - cfg.setClientConnectorConfiguration( new ClientConnectorConfiguration().setThinClientConfiguration( new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(ACTIVE_TASKS_LIMIT))); @@ -96,7 +119,19 @@ public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStat @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); - thinClient.close(); + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + thinClient.getOrCreateCache(new ClientCacheConfiguration() + .setName(DEFAULT_CACHE_NAME) + .setAtomicityMode(atomicityMode)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + thinClient.cache(DEFAULT_CACHE_NAME).clear(); } /** @throws Exception If failed. */ @@ -171,6 +206,68 @@ public void testCacheOperation() throws Exception { checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5)); } + /** + * Cache {@link TcpClientCache#putAllConflict} operation performed. + * @throws Exception If failed. + */ + @Test + public void testCachePutAllConflict() throws Exception { + checkCacheAllConflictOperations(CACHE_PUT_ALL_CONFLICT, false); + } + + /** + * Cache {@link TcpClientCache#removeAllConflict} operation performed. + * @throws Exception If failed. + */ + @Test + public void testCacheRemoveAllConflict() throws Exception { + checkCacheAllConflictOperations(CACHE_REMOVE_ALL_CONFLICT, false); + } + + /** + * Cache {@link TcpClientCache#putAllConflictAsync} operation performed. + * @throws Exception If failed. + */ + @Test + public void testCachePutAllConflictAsync() throws Exception { + checkCacheAllConflictOperations(CACHE_PUT_ALL_CONFLICT, true); + } + + /** + * Cache {@link TcpClientCache#removeAllConflictAsync} operation performed. + * @throws Exception If failed. + */ + @Test + public void testCacheRemoveAllConflictAsync() throws Exception { + checkCacheAllConflictOperations(CACHE_REMOVE_ALL_CONFLICT, true); + } + + /** + * @param opType {@link OperationType} cache operation type. + * @param isAsync boolean flag for asynchronous cache operation processing. + */ + private void checkCacheAllConflictOperations(OperationType opType, boolean isAsync) throws Exception { + GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2); + + checkCacheOperation(opType, cache -> { + TcpClientCache clientCache = (TcpClientCache)cache; + + try { + if (opType == CACHE_PUT_ALL_CONFLICT && !isAsync) + clientCache.putAllConflict(F.asMap(6, new T3<>(1, confl, CU.EXPIRE_TIME_ETERNAL))); + else if (opType == CACHE_REMOVE_ALL_CONFLICT && !isAsync) + clientCache.removeAllConflict(F.asMap(6, confl)); + else if (opType == CACHE_PUT_ALL_CONFLICT) + clientCache.putAllConflictAsync(F.asMap(7, new T3<>(2, confl, CU.EXPIRE_TIME_ETERNAL))).get(); + else if (opType == CACHE_REMOVE_ALL_CONFLICT) + clientCache.removeAllConflictAsync(F.asMap(7, confl)).get(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + /** Checks cache operation. */ private void checkCacheOperation(OperationType op, Consumer> clo) throws Exception { long startTime = U.currentTimeMillis(); @@ -202,6 +299,8 @@ private void checkCacheOperation(OperationType op, Consumer