diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java index 7db76c1ddabd9..21251db8572fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java @@ -97,7 +97,7 @@ public void setBinaryContext(BinaryContext ctx, IgniteConfiguration cfg) { } /** {@inheritDoc} */ - @Override protected T unmarshal0(byte[] bytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { + @Override protected T unmarshal0(byte[] bytes, @Nullable ClassLoader clsLdr) { return impl.deserialize(bytes, clsLdr); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java index 5c2e84112bc4f..7c6c3246fd5ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java @@ -329,6 +329,26 @@ else if (log.isDebugEnabled()) if (isDeadClassLoader(meta)) return null; + boolean skipSearchDeployment = false; + + // Check already exist deployment. + if (meta.deploymentMode() == SHARED) { + Collection created = getDeployments(); + + for (GridDeployment dep0 : created) { + // hot redeploy from same node + if (dep0.participants().containsKey(meta.senderNodeId()) || dep0.undeployed()) + continue; + + IgniteBiTuple, Throwable> cls = dep0.deployedClass(meta.className(), meta.alias()); + + if (cls.getKey() != null && cls.getValue() == null) { + ((SharedDeployment)dep0).addParticipant(meta.senderNodeId(), meta.classLoaderId()); + skipSearchDeployment = true; + } + } + } + if (!F.isEmpty(meta.participants())) { Map participants = new LinkedHashMap<>(); @@ -376,7 +396,8 @@ else if (ctx.discovery().node(meta.senderNodeId()) == null) { return null; } - dep = (SharedDeployment)searchDeploymentCache(meta); + if (!skipSearchDeployment) + dep = (SharedDeployment)searchDeploymentCache(meta); if (dep == null) { List deps = cache.get(meta.userVersion()); @@ -1243,8 +1264,6 @@ void onRemoved() { /** {@inheritDoc} */ @Override public void onDeployed(Class cls) { - assert !Thread.holdsLock(mux); - boolean isTask = isTask(cls); String msg = (isTask ? "Task" : "Class") + " was deployed in SHARED or CONTINUOUS mode: " + cls; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java index efdeb0ae2c1d3..ec9cc81d9cba2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java @@ -38,6 +38,14 @@ public static T rethrowDisarmedP2PClassLoadingFailure(NoClassDefFoundError e throw error; } + /** Wraps specific exception. + * + * @param e Exception to be wrapped. + */ + public static P2PClassNotFoundException wrapWithP2PFailure(NoClassDefFoundError e) { + return new P2PClassNotFoundException("P2P class loading failed", e); + } + /** * Returns @{code true} if the given Throwable is an error caused by a P2P class-loading failure. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index ffe5b944d6f8e..b4a9bd0133b8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; @@ -33,6 +32,8 @@ import org.apache.ignite.stream.StreamReceiver; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues.wrapWithP2PFailure; + /** * Job to put entries to cache on affinity node. */ @@ -146,7 +147,7 @@ class DataStreamerUpdateJob implements GridPlainCallable { return null; } catch (NoClassDefFoundError e) { - return P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e); + throw wrapWithP2PFailure(e); } finally { if (ignoreDepOwnership) diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java index d9fdd3d2f1c3d..7e7c8f107e2c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java @@ -51,7 +51,7 @@ class JdkMarshallerObjectInputStream extends ObjectInputStream { } /** {@inheritDoc} */ - @Override protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + @Override protected Class resolveClass(ObjectStreamClass desc) throws ClassNotFoundException { // NOTE: DO NOT CHANGE TO 'clsLoader.loadClass()' // Must have 'Class.forName()' instead of clsLoader.loadClass() // due to weird ClassNotFoundExceptions for arrays of classes diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java index f1128573c3e74..2fe83283a4567 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache; +import java.util.HashMap; import java.util.Map; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; @@ -27,12 +29,16 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.config.GridTestProperties; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -116,6 +122,7 @@ public void testInvokeDeployment2() throws Exception { depMode = DeploymentMode.SHARED; doTestInvoke(); + doTestInvokeEx(); } /** @@ -168,6 +175,96 @@ private void doTestInvoke() throws Exception { } } + /** + * Scenario: 2 different client nodes invoke entry processors on intersected collection of keys. + * @throws Exception + */ + private void doTestInvokeEx() throws Exception { + String testCacheName = "dynamic_params"; + + String prcClsName = "org.apache.ignite.tests.p2p.CacheDeploymentEntryProcessorMultipleEnts"; + + String contClsName = "org.apache.ignite.tests.p2p.cache.Container"; + + try { + startGrid(0); + IgniteEx cli1 = startClientGrid(1); + IgniteEx cli2 = startClientGrid(2); + + Class procCls1 = cli1.configuration().getClassLoader().loadClass(prcClsName); + Class procCls2 = cli2.configuration().getClassLoader().loadClass(prcClsName); + + Class contCls1 = cli1.configuration().getClassLoader().loadClass(contClsName); + Class contCls2 = cli2.configuration().getClassLoader().loadClass(contClsName); + + // just one more additional class unavailability check. + try { + Class.forName(TEST_VALUE); + fail(); + } + catch (ClassNotFoundException e) { + // No op. + } + + Class cacheValClazz = grid(2).configuration().getClassLoader().loadClass(TEST_VALUE); + Object cacheVal = cacheValClazz.newInstance(); + + CacheConfiguration ccfg = new CacheConfiguration<>(); + ccfg.setCacheMode(REPLICATED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setName(testCacheName); + + IgniteCache processedCache = cli1.createCache(ccfg); + + Map map = new HashMap<>(); + for (long i = 0; i < 100; i++) { + map.put(i, cacheVal); + } + + processedCache.putAll(map); + + IgniteCache cache1 = cli1.cache(testCacheName); + IgniteCache cache2 = cli2.cache(testCacheName); + + Object cont1 = contCls1.getDeclaredConstructor(Object.class).newInstance(map); + Object cont2 = contCls2.getDeclaredConstructor(Object.class).newInstance(map); + + for (int i = 0; i < 10; ++i) { + IgniteCache procCache1 = cache1; + IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> { + for (long key = 0; key < 10; key++) { + procCache1.invoke(key, + (CacheEntryProcessor)procCls1.getDeclaredConstructor(Object.class).newInstance(cont1)); + } + }); + + IgniteCache procCache2 = cache2; + IgniteInternalFuture f2 = GridTestUtils.runAsync(() -> { + for (long key = 10; key > 0; key--) { + procCache2.invoke(key, + (CacheEntryProcessor)procCls2.getDeclaredConstructor(Object.class).newInstance(cont2)); + }; + }); + + long duration = TimeUnit.SECONDS.toMillis(30); + + f1.get(duration); + f2.get(duration); + + stopAllClients(true); + + cli1 = startClientGrid(1); + cli2 = startClientGrid(2); + + cache1 = cli1.cache(testCacheName); + cache2 = cli2.cache(testCacheName); + } + } + finally { + stopAllGrids(); + } + } + /** * @throws Exception In case of error. */ diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java index 504b4d6e2069d..5000fe323a056 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java @@ -20,6 +20,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; @@ -93,16 +94,18 @@ private void processTest() throws Exception { Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME); Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME); + UUID id = ignite2.cluster().localNode().id(); + // Execute task1 and task2 from node1 on node2 and make sure that they reuse same class loader on node2. - Integer res1 = (Integer)ignite1.compute().execute(task1, ignite2.cluster().localNode().id()); - Integer res2 = (Integer)ignite1.compute().execute(task2, ignite2.cluster().localNode().id()); + Integer res1 = (Integer)ignite1.compute().execute(task1, id); + Integer res2 = (Integer)ignite1.compute().execute(task2, id); assert res1.equals(res2); // Class loaders are same - Integer res3 = (Integer)ignite3.compute().execute(task1, ignite2.cluster().localNode().id()); - Integer res4 = (Integer)ignite3.compute().execute(task2, ignite2.cluster().localNode().id()); + Integer res3 = (Integer)ignite3.compute().execute(task1, id); + Integer res4 = (Integer)ignite3.compute().execute(task2, id); - assert res3.equals(res4); + assertEquals(res3, res4); } finally { stopGrid(1); @@ -147,18 +150,6 @@ public void testContinuousMode() throws Exception { processTest(); } - /** - * Test GridDeploymentMode.SHARED mode. - * - * @throws Exception if error occur. - */ - @Test - public void testSharedMode() throws Exception { - depMode = DeploymentMode.SHARED; - - processTest(); - } - /** * Return true if and only if all elements of array are different. * diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java index 0d4b6ccb90372..51966fa93a97f 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java @@ -130,7 +130,7 @@ public void testLambdaDeploymentFromSecondAndThird() throws Exception { new URL(GridTestProperties.getProperty("p2p.uri.cls.second"))}), ignite3, 10_000); for (Object o: res) - assertEquals(o, 43); + assertEquals(43, o); } finally { stopAllGrids(); diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java new file mode 100644 index 0000000000000..bdb16113bb05f --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import java.util.Map; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.tests.p2p.cache.Container; + +/** + * Entry processor for p2p tests. + */ +public class CacheDeploymentEntryProcessorMultipleEnts implements CacheEntryProcessor { + /** */ + private Map entToProcess; + + /** */ + public CacheDeploymentEntryProcessorMultipleEnts(Object container) { + entToProcess = (Map)((Container)container).field; + } + + /** {@inheritDoc} */ + @Override public Boolean process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + boolean pr = false; + + for (CacheDeploymentTestValue ent : entToProcess.values()) { + CacheDeploymentTestValue key = ent; + pr = key != null; + } + CacheDeploymentTestValue val = entry.getValue(); + + return pr; + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java index fff36123f27f7..625890dcc6ae3 100644 --- a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java @@ -20,7 +20,7 @@ /** */ public class Container { /** */ - private Object field; + public Object field; /** */ public Container(Object field) {