Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-15126 Fixes cluster hanging if exception occurred during activation. #11694

Merged
merged 9 commits into from
Feb 6, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,7 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setConnectorConfiguration(new ConnectorConfiguration())
.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
)
.setWalSegments(3)
.setWalSegmentSize(512 * 1024)
)
.setConsistentId(igniteInstanceName)
.setIncludeEventTypes(includedEvtTypes);
return getConfiguration(igniteInstanceName, true);
}

/** {@inheritDoc} */
Expand All @@ -72,9 +60,51 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra
cleanPersistenceDir();
}

/** */
private IgniteConfiguration getConfiguration(String igniteInstanceName, boolean isPersistenceEnabled) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
.setConnectorConfiguration(new ConnectorConfiguration())
.setConsistentId(igniteInstanceName)
.setIncludeEventTypes(includedEvtTypes);

if (isPersistenceEnabled) {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true))
.setWalSegments(3)
.setWalSegmentSize(512 * 1024)
);
}

return cfg;
}

/** */
protected abstract void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types);

/** */
@Test
public void testInMemoryBaselineAutoAdjustNotProduceEvents() throws Exception {
startGrid(getConfiguration(getTestIgniteInstanceName(0), false));
startGrid(getConfiguration(getTestIgniteInstanceName(1), false));

AtomicBoolean isBaselineChangedEvtListened = new AtomicBoolean();

listen(
grid(0),
event -> {
isBaselineChangedEvtListened.set(true);

return true;
},
EventType.EVT_BASELINE_CHANGED
);

startGrid(getConfiguration(getTestIgniteInstanceName(2), false));

assertFalse(GridTestUtils.waitForCondition(isBaselineChangedEvtListened::get, 2000));
}

/** */
@Test
public void testChangeBltWithControlUtility() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME;
Expand Down Expand Up @@ -1718,10 +1717,6 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep
if (cfg.getConnectorConfiguration() != null)
add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange());

// Whether rollback of dynamic cache start is supported or not.
// This property is added because of backward compatibility.
add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE);

// Save data storage configuration.
addDataStorageConfigurationAttributes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ public final class IgniteNodeAttributes {
/** Rebalance thread pool size. */
public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size";

/** Internal attribute name constant. */
public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";

/** Supported features. */
public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNo
* @param failMsg Dynamic change request fail message.
* @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
public void onCacheChangeRequested(ExchangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion();

ExchangeActions exchangeActions = new ExchangeActions();
Expand Down Expand Up @@ -603,7 +603,7 @@ public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, Aff
processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true);
}

failMsg.exchangeActions(exchangeActions);
failMsg.exchangeRollbackActions(exchangeActions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
Expand All @@ -28,55 +30,55 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

/**
* This class represents discovery message that is used to provide information about dynamic cache start failure.
*/
public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage {
public class ExchangeFailureMessage implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;

/** Cache names. */
@GridToStringInclude
private Collection<String> cacheNames;
private final Collection<String> cacheNames;

/** Custom message ID. */
private IgniteUuid id;
private final IgniteUuid id;

/** */
private GridDhtPartitionExchangeId exchId;
private final GridDhtPartitionExchangeId exchId;

/** */
@GridToStringInclude
private IgniteCheckedException cause;
private final Map<UUID, Exception> exchangeErrors;

/** Cache updates to be executed on exchange. */
private transient ExchangeActions exchangeActions;
/** Actions to be done to rollback changes done before the exchange failure. */
private transient ExchangeActions exchangeRollbackActions;

/**
* Creates new DynamicCacheChangeFailureMessage instance.
*
* @param locNode Local node.
* @param exchId Exchange Id.
* @param cause Cache start error.
* @param cacheNames Cache names.
* @param exchangeErrors Errors that caused PME to fail.
*/
public DynamicCacheChangeFailureMessage(
public ExchangeFailureMessage(
ClusterNode locNode,
GridDhtPartitionExchangeId exchId,
IgniteCheckedException cause,
Map<UUID, Exception> exchangeErrors,
Collection<String> cacheNames
) {
assert exchId != null;
assert cause != null;
assert !F.isEmpty(exchangeErrors);
assert !F.isEmpty(cacheNames) : cacheNames;

this.id = IgniteUuid.fromUuid(locNode.id());
this.exchId = exchId;
this.cause = cause;
this.cacheNames = cacheNames;
this.exchangeErrors = exchangeErrors;
}

/** {@inheritDoc} */
Expand All @@ -91,27 +93,40 @@ public Collection<String> cacheNames() {
return cacheNames;
}

/** */
public Map<UUID, Exception> exchangeErrors() {
return exchangeErrors;
}

/**
* @return Cache start error.
* @return Cache updates to be executed on exchange.
*/
public IgniteCheckedException error() {
return cause;
public ExchangeActions exchangeRollbackActions() {
return exchangeRollbackActions;
}

/**
* @return Cache updates to be executed on exchange.
* @param exchangeRollbackActions Cache updates to be executed on exchange.
*/
public ExchangeActions exchangeActions() {
return exchangeActions;
public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
assert exchangeRollbackActions != null && !exchangeRollbackActions.empty() : exchangeRollbackActions;
NSAmelchev marked this conversation as resolved.
Show resolved Hide resolved

this.exchangeRollbackActions = exchangeRollbackActions;
}

/**
* @param exchangeActions Cache updates to be executed on exchange.
* Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method
* aggregates all the exceptions provided from all participating nodes.
*
* @return Exception that represents a cause of the exchange initialization failure.
*/
public void exchangeActions(ExchangeActions exchangeActions) {
assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
public IgniteCheckedException createFailureCompoundException() {
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");

for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet())
U.addSuppressed(ex, entry.getValue());

this.exchangeActions = exchangeActions;
return ex;
}

/**
Expand Down Expand Up @@ -141,6 +156,6 @@ public void exchangeActions(ExchangeActions exchangeActions) {

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeFailureMessage.class, this);
return S.toString(ExchangeFailureMessage.class, this);
}
}
Loading