Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20861 P2P prevent to load already deployed class from different node on SHARED mode #11041

Merged
merged 16 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void setBinaryContext(BinaryContext ctx, IgniteConfiguration cfg) {
}

/** {@inheritDoc} */
@Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
@Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader clsLdr) {
return impl.deserialize(bytes, clsLdr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ else if (log.isDebugEnabled())
if (isDeadClassLoader(meta))
return null;

boolean skipSearchDeployment = false;

// Check already exist deployment.
if (meta.deploymentMode() == SHARED) {
Collection<GridDeployment> created = getDeployments();

for (GridDeployment dep0 : created) {
// hot redeploy from same node
if (dep0.participants().containsKey(meta.senderNodeId()) || dep0.undeployed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we skipping participan deployment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the case of this part is to find participant by nodeId and register if such is not found

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check code a bit below, if participant contains - skip, if not - register

continue;

IgniteBiTuple<Class<?>, Throwable> cls = dep0.deployedClass(meta.className(), meta.alias());

if (cls.getKey() != null && cls.getValue() == null) {
((SharedDeployment)dep0).addParticipant(meta.senderNodeId(), meta.classLoaderId());
skipSearchDeployment = true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't bracke the loop when you find the deployment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we can assigned it into dep variable and brake the root while loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in common - getDeployments() returns the collection an we need to update all participants

}
}

if (!F.isEmpty(meta.participants())) {
Map<UUID, IgniteUuid> participants = new LinkedHashMap<>();

Expand Down Expand Up @@ -376,7 +396,8 @@ else if (ctx.discovery().node(meta.senderNodeId()) == null) {
return null;
}

dep = (SharedDeployment)searchDeploymentCache(meta);
if (!skipSearchDeployment)
dep = (SharedDeployment)searchDeploymentCache(meta);

if (dep == null) {
List<SharedDeployment> deps = cache.get(meta.userVersion());
Expand Down Expand Up @@ -1243,8 +1264,6 @@ void onRemoved() {

/** {@inheritDoc} */
@Override public void onDeployed(Class<?> cls) {
assert !Thread.holdsLock(mux);

boolean isTask = isTask(cls);

String msg = (isTask ? "Task" : "Class") + " was deployed in SHARED or CONTINUOUS mode: " + cls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public static <T> T rethrowDisarmedP2PClassLoadingFailure(NoClassDefFoundError e
throw error;
}

/** Wraps specific exception.
*
* @param e Exception to be wrapped.
*/
public static P2PClassNotFoundException wrapWithP2PFailure(NoClassDefFoundError e) {
return new P2PClassNotFoundException("P2P class loading failed", e);
}

/**
* Returns @{code true} if the given Throwable is an error caused by a P2P class-loading failure.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
Expand All @@ -33,6 +32,8 @@
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues.wrapWithP2PFailure;

/**
* Job to put entries to cache on affinity node.
*/
Expand Down Expand Up @@ -146,7 +147,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
return null;
}
catch (NoClassDefFoundError e) {
return P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
throw wrapWithP2PFailure(e);
}
finally {
if (ignoreDepOwnership)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class JdkMarshallerObjectInputStream extends ObjectInputStream {
}

/** {@inheritDoc} */
@Override protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
@Override protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
// NOTE: DO NOT CHANGE TO 'clsLoader.loadClass()'
// Must have 'Class.forName()' instead of clsLoader.loadClass()
// due to weird ClassNotFoundExceptions for arrays of classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.ignite.internal.processors.cache;

import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
Expand All @@ -27,12 +29,16 @@
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;

Expand Down Expand Up @@ -116,6 +122,7 @@ public void testInvokeDeployment2() throws Exception {
depMode = DeploymentMode.SHARED;

doTestInvoke();
doTestInvokeEx();
}

/**
Expand Down Expand Up @@ -168,6 +175,96 @@ private void doTestInvoke() throws Exception {
}
}

/**
* Scenario: 2 different client nodes invoke entry processors on intersected collection of keys.
* @throws Exception
*/
private void doTestInvokeEx() throws Exception {
String testCacheName = "dynamic_params";

String prcClsName = "org.apache.ignite.tests.p2p.CacheDeploymentEntryProcessorMultipleEnts";

String contClsName = "org.apache.ignite.tests.p2p.cache.Container";

try {
startGrid(0);
IgniteEx cli1 = startClientGrid(1);
IgniteEx cli2 = startClientGrid(2);

Class procCls1 = cli1.configuration().getClassLoader().loadClass(prcClsName);
Class procCls2 = cli2.configuration().getClassLoader().loadClass(prcClsName);

Class contCls1 = cli1.configuration().getClassLoader().loadClass(contClsName);
Class contCls2 = cli2.configuration().getClassLoader().loadClass(contClsName);

// just one more additional class unavailability check.
try {
Class.forName(TEST_VALUE);
fail();
}
catch (ClassNotFoundException e) {
// No op.
}

Class<?> cacheValClazz = grid(2).configuration().getClassLoader().loadClass(TEST_VALUE);
Object cacheVal = cacheValClazz.newInstance();

CacheConfiguration<Long, Object> ccfg = new CacheConfiguration<>();
ccfg.setCacheMode(REPLICATED);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setName(testCacheName);

IgniteCache<Long, Object> processedCache = cli1.createCache(ccfg);

Map<Long, Object> map = new HashMap<>();
for (long i = 0; i < 100; i++) {
map.put(i, cacheVal);
}

processedCache.putAll(map);

IgniteCache<Object, Object> cache1 = cli1.cache(testCacheName);
IgniteCache<Object, Object> cache2 = cli2.cache(testCacheName);

Object cont1 = contCls1.getDeclaredConstructor(Object.class).newInstance(map);
Object cont2 = contCls2.getDeclaredConstructor(Object.class).newInstance(map);

for (int i = 0; i < 10; ++i) {
IgniteCache<Object, Object> procCache1 = cache1;
IgniteInternalFuture<Object> f1 = GridTestUtils.runAsync(() -> {
for (long key = 0; key < 10; key++) {
procCache1.invoke(key,
(CacheEntryProcessor)procCls1.getDeclaredConstructor(Object.class).newInstance(cont1));
}
});

IgniteCache<Object, Object> procCache2 = cache2;
IgniteInternalFuture<Object> f2 = GridTestUtils.runAsync(() -> {
for (long key = 10; key > 0; key--) {
procCache2.invoke(key,
(CacheEntryProcessor)procCls2.getDeclaredConstructor(Object.class).newInstance(cont2));
};
});

long duration = TimeUnit.SECONDS.toMillis(30);

f1.get(duration);
f2.get(duration);

stopAllClients(true);

cli1 = startClientGrid(1);
cli2 = startClientGrid(2);

cache1 = cli1.cache(testCacheName);
cache2 = cli2.cache(testCacheName);
}
}
finally {
stopAllGrids();
}
}

/**
* @throws Exception In case of error.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand Down Expand Up @@ -93,16 +94,18 @@ private void processTest() throws Exception {
Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME);
Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME);

UUID id = ignite2.cluster().localNode().id();

// Execute task1 and task2 from node1 on node2 and make sure that they reuse same class loader on node2.
Integer res1 = (Integer)ignite1.compute().execute(task1, ignite2.cluster().localNode().id());
Integer res2 = (Integer)ignite1.compute().execute(task2, ignite2.cluster().localNode().id());
Integer res1 = (Integer)ignite1.compute().execute(task1, id);
Integer res2 = (Integer)ignite1.compute().execute(task2, id);

assert res1.equals(res2); // Class loaders are same

Integer res3 = (Integer)ignite3.compute().execute(task1, ignite2.cluster().localNode().id());
Integer res4 = (Integer)ignite3.compute().execute(task2, ignite2.cluster().localNode().id());
Integer res3 = (Integer)ignite3.compute().execute(task1, id);
Integer res4 = (Integer)ignite3.compute().execute(task2, id);

assert res3.equals(res4);
assertEquals(res3, res4);
}
finally {
stopGrid(1);
Expand Down Expand Up @@ -147,18 +150,6 @@ public void testContinuousMode() throws Exception {
processTest();
}

/**
* Test GridDeploymentMode.SHARED mode.
*
* @throws Exception if error occur.
*/
@Test
public void testSharedMode() throws Exception {
depMode = DeploymentMode.SHARED;

processTest();
}

/**
* Return true if and only if all elements of array are different.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testLambdaDeploymentFromSecondAndThird() throws Exception {
new URL(GridTestProperties.getProperty("p2p.uri.cls.second"))}), ignite3, 10_000);

for (Object o: res)
assertEquals(o, 43);
assertEquals(43, o);
}
finally {
stopAllGrids();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.tests.p2p;

import java.util.Map;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.tests.p2p.cache.Container;

/**
* Entry processor for p2p tests.
*/
public class CacheDeploymentEntryProcessorMultipleEnts implements CacheEntryProcessor<String, CacheDeploymentTestValue, Boolean> {
/** */
private Map<Long, CacheDeploymentTestValue> entToProcess;

/** */
public CacheDeploymentEntryProcessorMultipleEnts(Object container) {
entToProcess = (Map<Long, CacheDeploymentTestValue>)((Container)container).field;
}

/** {@inheritDoc} */
@Override public Boolean process(MutableEntry<String, CacheDeploymentTestValue> entry,
Object... arguments) throws EntryProcessorException {
boolean pr = false;

for (CacheDeploymentTestValue ent : entToProcess.values()) {
CacheDeploymentTestValue key = ent;
pr = key != null;
}
CacheDeploymentTestValue val = entry.getValue();

return pr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/** */
public class Container {
/** */
private Object field;
public Object field;

/** */
public Container(Object field) {
Expand Down
Loading