diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java index 4b9265fcce5f0..c1596d6785722 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java @@ -41,19 +41,7 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return super.getConfiguration(igniteInstanceName) - .setConnectorConfiguration(new ConnectorConfiguration()) - .setDataStorageConfiguration( - new DataStorageConfiguration() - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setPersistenceEnabled(true) - ) - .setWalSegments(3) - .setWalSegmentSize(512 * 1024) - ) - .setConsistentId(igniteInstanceName) - .setIncludeEventTypes(includedEvtTypes); + return getConfiguration(igniteInstanceName, true); } /** {@inheritDoc} */ @@ -72,9 +60,51 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra cleanPersistenceDir(); } + /** */ + private IgniteConfiguration getConfiguration(String igniteInstanceName, boolean isPersistenceEnabled) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) + .setConnectorConfiguration(new ConnectorConfiguration()) + .setConsistentId(igniteInstanceName) + .setIncludeEventTypes(includedEvtTypes); + + if (isPersistenceEnabled) { + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true)) + .setWalSegments(3) + .setWalSegmentSize(512 * 1024) + ); + } + + return cfg; + } + /** */ protected abstract void listen(IgniteEx ignite, IgnitePredicate lsnr, int... types); + /** */ + @Test + public void testInMemoryBaselineAutoAdjustNotProduceEvents() throws Exception { + startGrid(getConfiguration(getTestIgniteInstanceName(0), false)); + startGrid(getConfiguration(getTestIgniteInstanceName(1), false)); + + AtomicBoolean isBaselineChangedEvtListened = new AtomicBoolean(); + + listen( + grid(0), + event -> { + isBaselineChangedEvtListened.set(true); + + return true; + }, + EventType.EVT_BASELINE_CHANGED + ); + + startGrid(getConfiguration(getTestIgniteInstanceName(2), false)); + + assertFalse(GridTestUtils.waitForCondition(isBaselineChangedEvtListened::get, 2000)); + } + /** */ @Test public void testChangeBltWithControlUtility() throws Exception { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 0973010aee3e8..c22cce72b23f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -240,7 +240,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME; @@ -1718,10 +1717,6 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep if (cfg.getConnectorConfiguration() != null) add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange()); - // Whether rollback of dynamic cache start is supported or not. - // This property is added because of backward compatibility. - add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE); - // Save data storage configuration. addDataStorageConfigurationAttributes(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 0f938884604c8..7e844b35f482c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -193,9 +193,6 @@ public final class IgniteNodeAttributes { /** Rebalance thread pool size. */ public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size"; - /** Internal attribute name constant. */ - public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported"; - /** Supported features. */ public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index a2c04e8e5cab8..2894875c7a834 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -559,7 +559,7 @@ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNo * @param failMsg Dynamic change request fail message. * @param topVer Current topology version. */ - public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) { + public void onCacheChangeRequested(ExchangeFailureMessage failMsg, AffinityTopologyVersion topVer) { AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion(); ExchangeActions exchangeActions = new ExchangeActions(); @@ -603,7 +603,7 @@ public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, Aff processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true); } - failMsg.exchangeActions(exchangeActions); + failMsg.exchangeRollbackActions(exchangeActions); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java similarity index 64% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java index 937eb6d461484..b29505e5ece7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.discovery.DiscoCache; @@ -28,55 +30,55 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** * This class represents discovery message that is used to provide information about dynamic cache start failure. */ -public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage { +public class ExchangeFailureMessage implements DiscoveryCustomMessage { /** */ private static final long serialVersionUID = 0L; /** Cache names. */ @GridToStringInclude - private Collection cacheNames; + private final Collection cacheNames; /** Custom message ID. */ - private IgniteUuid id; + private final IgniteUuid id; /** */ - private GridDhtPartitionExchangeId exchId; + private final GridDhtPartitionExchangeId exchId; /** */ @GridToStringInclude - private IgniteCheckedException cause; + private final Map exchangeErrors; - /** Cache updates to be executed on exchange. */ - private transient ExchangeActions exchangeActions; + /** Actions to be done to rollback changes done before the exchange failure. */ + private transient ExchangeActions exchangeRollbackActions; /** * Creates new DynamicCacheChangeFailureMessage instance. * * @param locNode Local node. * @param exchId Exchange Id. - * @param cause Cache start error. - * @param cacheNames Cache names. + * @param exchangeErrors Errors that caused PME to fail. */ - public DynamicCacheChangeFailureMessage( + public ExchangeFailureMessage( ClusterNode locNode, GridDhtPartitionExchangeId exchId, - IgniteCheckedException cause, + Map exchangeErrors, Collection cacheNames ) { assert exchId != null; - assert cause != null; + assert !F.isEmpty(exchangeErrors); assert !F.isEmpty(cacheNames) : cacheNames; this.id = IgniteUuid.fromUuid(locNode.id()); this.exchId = exchId; - this.cause = cause; this.cacheNames = cacheNames; + this.exchangeErrors = exchangeErrors; } /** {@inheritDoc} */ @@ -91,27 +93,40 @@ public Collection cacheNames() { return cacheNames; } + /** */ + public Map exchangeErrors() { + return exchangeErrors; + } + /** - * @return Cache start error. + * @return Cache updates to be executed on exchange. */ - public IgniteCheckedException error() { - return cause; + public ExchangeActions exchangeRollbackActions() { + return exchangeRollbackActions; } /** - * @return Cache updates to be executed on exchange. + * @param exchangeRollbackActions Cache updates to be executed on exchange. */ - public ExchangeActions exchangeActions() { - return exchangeActions; + public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) { + assert exchangeRollbackActions != null && !exchangeRollbackActions.empty() : exchangeRollbackActions; + + this.exchangeRollbackActions = exchangeRollbackActions; } /** - * @param exchangeActions Cache updates to be executed on exchange. + * Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method + * aggregates all the exceptions provided from all participating nodes. + * + * @return Exception that represents a cause of the exchange initialization failure. */ - public void exchangeActions(ExchangeActions exchangeActions) { - assert exchangeActions != null && !exchangeActions.empty() : exchangeActions; + public IgniteCheckedException createFailureCompoundException() { + IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process."); + + for (Map.Entry entry : exchangeErrors.entrySet()) + U.addSuppressed(ex, entry.getValue()); - this.exchangeActions = exchangeActions; + return ex; } /** @@ -141,6 +156,6 @@ public void exchangeActions(ExchangeActions exchangeActions) { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(DynamicCacheChangeFailureMessage.class, this); + return S.toString(ExchangeFailureMessage.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 78c7b0c317fff..e271c3b26a900 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -53,20 +53,12 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterGroupEmptyException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.BaselineChangedEvent; -import org.apache.ignite.events.ClusterActivationEvent; -import org.apache.ignite.events.ClusterStateChangeEvent; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; @@ -82,7 +74,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; -import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; @@ -114,7 +105,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; -import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; @@ -139,7 +129,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -159,9 +148,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getLong; -import static org.apache.ignite.events.EventType.EVT_CLUSTER_ACTIVATED; -import static org.apache.ignite.events.EventType.EVT_CLUSTER_DEACTIVATED; -import static org.apache.ignite.events.EventType.EVT_CLUSTER_STATE_CHANGED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -599,23 +585,6 @@ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) { exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); - - boolean baselineChanging; - if (stateChangeMsg.forceChangeBaselineTopology()) - baselineChanging = true; - else { - DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState(); - - assert state.transition() : state; - - baselineChanging = exchActions.changedBaseline() - // Or it is the first activation. - || state.state() != ClusterState.INACTIVE - && !state.previouslyActive() - && state.previousBaselineTopology() == null; - } - - exchFut.listen(f -> onClusterStateChangeFinish(exchActions, baselineChanging)); } } else if (customMsg instanceof DynamicCacheChangeBatch) { @@ -643,13 +612,14 @@ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery( exchangeFuture(msg.exchangeId(), null, null, null, null) .onAffinityChangeMessage(evt.eventNode(), msg); } - else if (customMsg instanceof DynamicCacheChangeFailureMessage) { - DynamicCacheChangeFailureMessage msg = (DynamicCacheChangeFailureMessage)customMsg; + else if (customMsg instanceof ExchangeFailureMessage) { + ExchangeFailureMessage msg = (ExchangeFailureMessage)customMsg; - if (msg.exchangeId().topologyVersion().topologyVersion() >= - affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion()) - exchangeFuture(msg.exchangeId(), null, null, null, null) - .onDynamicCacheChangeFail(evt.eventNode(), msg); + long exchangeTopVer = msg.exchangeId().topologyVersion().topologyVersion(); + long locNodeJoinTopVer = affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion(); + + if (exchangeTopVer >= locNodeJoinTopVer) + exchangeFuture(msg.exchangeId(), null, null, null, null).onExchangeFailureMessage(evt.eventNode(), msg); } else if (customMsg instanceof SnapshotDiscoveryMessage && ((SnapshotDiscoveryMessage)customMsg).needExchange()) { @@ -720,73 +690,6 @@ else if (customMsg instanceof WalStateAbstractMessage } } - /** */ - private void onClusterStateChangeFinish(ExchangeActions exchActions, boolean baselineChanging) { - A.notNull(exchActions, "exchActions"); - - GridEventStorageManager evtMngr = cctx.kernalContext().event(); - - if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED) || - exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED) || - exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED) - ) { - List evts = new ArrayList<>(2); - - ClusterNode locNode = cctx.kernalContext().discovery().localNode(); - - Collection bltNodes = cctx.kernalContext().cluster().get().currentBaselineTopology(); - - boolean collectionUsed = false; - - if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED)) { - assert !exchActions.deactivate() : exchActions; - - collectionUsed = true; - - evts.add(new ClusterActivationEvent(locNode, "Cluster activated.", EVT_CLUSTER_ACTIVATED, bltNodes)); - } - - if (exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED)) { - assert !exchActions.activate() : exchActions; - - collectionUsed = true; - - evts.add(new ClusterActivationEvent(locNode, "Cluster deactivated.", EVT_CLUSTER_DEACTIVATED, bltNodes)); - } - - if (exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)) { - StateChangeRequest req = exchActions.stateChangeRequest(); - - if (collectionUsed && bltNodes != null) - bltNodes = new ArrayList<>(bltNodes); - - evts.add(new ClusterStateChangeEvent(req.prevState(), req.state(), bltNodes, locNode, "Cluster state changed.")); - } - - A.notEmpty(evts, "events " + exchActions); - - cctx.kernalContext().pools().getSystemExecutorService() - .submit(() -> evts.forEach(e -> cctx.kernalContext().event().record(e))); - } - - GridKernalContext ctx = cctx.kernalContext(); - - if (baselineChanging) { - ctx.pools().getStripedExecutorService().execute(new Runnable() { - @Override public void run() { - if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) { - ctx.event().record(new BaselineChangedEvent( - ctx.discovery().localNode(), - "Baseline changed.", - EventType.EVT_BASELINE_CHANGED, - ctx.cluster().get().currentBaselineTopology() - )); - } - } - }); - } - } - /** * @param task Task to run in exchange worker thread. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9e2bf43ae168d..eeb435a410a76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -4263,8 +4263,8 @@ else if (msg0 instanceof WalStateFinishMessage) return changeRequested; } - if (msg instanceof DynamicCacheChangeFailureMessage) - cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer); + if (msg instanceof ExchangeFailureMessage) + cachesInfo.onCacheChangeRequested((ExchangeFailureMessage)msg, topVer); if (msg instanceof ClientCacheChangeDiscoveryMessage) cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java index faeba835caba2..a0ce972da8559 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java @@ -43,22 +43,28 @@ public class StateChangeRequest { /** */ private final AffinityTopologyVersion topVer; + /** */ + private final boolean isBaselineChangeRequest; + /** * @param msg Message. * @param bltHistItem Baseline history item. * @param prevState Previous cluster state. - * @param topVer State change topology versoin. + * @param topVer State change topology version. + * @param isBaselineChangeRequest Whether this request changes baseline. */ public StateChangeRequest( ChangeGlobalStateMessage msg, BaselineTopologyHistoryItem bltHistItem, ClusterState prevState, - AffinityTopologyVersion topVer + AffinityTopologyVersion topVer, + boolean isBaselineChangeRequest ) { this.msg = msg; prevBltHistItem = bltHistItem; this.prevState = prevState; this.topVer = topVer; + this.isBaselineChangeRequest = isBaselineChangeRequest; } /** @@ -98,6 +104,11 @@ public ClusterState state() { return msg.state(); } + /** */ + public boolean isBaselineChangeRequest() { + return isBaselineChangeRequest; + } + /** * @return Previous cluster state. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9a8d610d171c0..d5a308de66a2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -76,12 +76,12 @@ import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; +import org.apache.ignite.internal.processors.cache.ExchangeFailureMessage; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; @@ -138,10 +138,10 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getLong; +import static org.apache.ignite.cluster.ClusterState.INACTIVE; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent; @@ -310,8 +310,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Exchange exceptions from all participating nodes. */ private final Map exchangeGlobalExceptions = new ConcurrentHashMap<>(); - /** Used to track the fact that {@link DynamicCacheChangeFailureMessage} was sent. */ - private volatile boolean cacheChangeFailureMsgSent; + /** Used to track the fact that {@link ExchangeFailureMessage} was sent. */ + private volatile boolean isExchangeFailureMsgSent; /** */ private final ConcurrentMap msgs = new ConcurrentHashMap<>(); @@ -1451,7 +1451,7 @@ private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedExcep registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions); } catch (Exception e) { - if (reconnectOnError(e) || !isRollbackSupported()) + if (reconnectOnError(e)) // This exception will be handled by init() method. throw e; @@ -2523,8 +2523,8 @@ private String exchangeTimingsLogMessage(String header, List timings) { if (exchActions != null && err0 == null) exchActions.completeRequestFutures(cctx, null); - if (stateChangeExchange() && err0 == null) - cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest()); + if (stateChangeExchange() && err0 == null && finishState != null && finishState.isCompleted()) + cctx.kernalContext().state().onStateChangeExchangeDone(exchActions); }); if (super.onDone(res, err)) { @@ -3094,9 +3094,9 @@ public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleM if (cctx.kernalContext().isStopping()) return; - // DynamicCacheChangeFailureMessage was sent. + // ExchangeFailureMessage was sent. // Thus, there is no need to create and send GridDhtPartitionsFullMessage. - if (cacheChangeFailureMsgSent) + if (isExchangeFailureMsgSent) return; FinishState finishState0; @@ -3233,7 +3233,7 @@ else if (log.isDebugEnabled()) if (finishState0 != null) { // DynamicCacheChangeFailureMessage was sent. // Thus, there is no need to create and send GridDhtPartitionsFullMessage. - if (!cacheChangeFailureMsgSent) + if (!isExchangeFailureMsgSent) sendAllPartitionsToNode(finishState0, msg, nodeId); return; @@ -3643,59 +3643,26 @@ private void resetLostPartitions(Collection cacheNames) { } /** - * Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method - * aggregates all the exceptions provided from all participating nodes. - * - * @param globalExceptions collection exceptions from all participating nodes. - * @return exception that represents a cause of the exchange initialization failure. - */ - private IgniteCheckedException createExchangeException(Map globalExceptions) { - IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process."); - - for (Map.Entry entry : globalExceptions.entrySet()) - if (ex != entry.getValue()) - ex.addSuppressed(entry.getValue()); - - return ex; - } - - /** - * @return {@code true} if the given {@code discoEvt} supports the rollback procedure. - */ - private boolean isRollbackSupported() { - if (!firstEvtDiscoCache.checkAttribute(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE)) - return false; - - // Currently the rollback process is supported for dynamically started caches only. - return firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange(); - } - - /** - * Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes that represents a cause of exchange + * Sends {@link ExchangeFailureMessage} to all participated nodes that represents a cause of exchange * failure. */ private void sendExchangeFailureMessage() { assert crd != null && crd.isLocal(); try { - IgniteCheckedException err = createExchangeException(exchangeGlobalExceptions); - List cacheNames = new ArrayList<>(exchActions.cacheStartRequests().size()); for (ExchangeActions.CacheActionData actionData : exchActions.cacheStartRequests()) cacheNames.add(actionData.request().cacheName()); - DynamicCacheChangeFailureMessage msg = new DynamicCacheChangeFailureMessage( - cctx.localNode(), exchId, err, cacheNames); + ExchangeFailureMessage msg = new ExchangeFailureMessage(cctx.localNode(), exchId, exchangeGlobalExceptions, cacheNames); if (log.isDebugEnabled()) - log.debug("Dynamic cache change failed (send message to all participating nodes): " + msg); + log.debug("Exchange process failed (send message to all participating nodes): " + msg); - cacheChangeFailureMsgSent = true; + isExchangeFailureMsgSent = true; cctx.discovery().sendCustomEvent(msg); - - return; } catch (IgniteCheckedException e) { if (reconnectOnError(e)) @@ -3772,7 +3739,7 @@ private void finishExchangeOnCoordinator(@Nullable Collection sndRe return; try { - if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange() && isRollbackSupported()) { + if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange()) { sendExchangeFailureMessage(); return; @@ -4823,7 +4790,7 @@ private void updatePartitionSingleMap(UUID nodeId, GridDhtPartitionsSingleMessag * @param node Message sender node. * @param msg Failure message. */ - public void onDynamicCacheChangeFail(final ClusterNode node, final DynamicCacheChangeFailureMessage msg) { + public void onExchangeFailureMessage(final ClusterNode node, final ExchangeFailureMessage msg) { assert exchId.equals(msg.exchangeId()) : msg; assert firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange(); @@ -4839,15 +4806,24 @@ public void onDynamicCacheChangeFail(final ClusterNode node, final DynamicCacheC return; try { - assert msg.error() != null : msg; + assert !F.isEmpty(msg.exchangeErrors()) : msg; // Try to revert all the changes that were done during initialization phase cctx.affinity().forceCloseCaches( GridDhtPartitionsExchangeFuture.this, crd.isLocal(), - msg.exchangeActions() + msg.exchangeRollbackActions() ); + if (stateChangeExchange()) { + cctx.kernalContext().state().onStateChangeError(msg.exchangeErrors(), actions.stateChangeRequest()); + cctx.kernalContext().state().onStateFinishMessage(new ChangeGlobalStateFinishMessage( + actions.stateChangeRequest().requestId(), + INACTIVE, + false + )); + } + synchronized (mux) { finishState = new FinishState(crd.id(), initialVersion(), null); @@ -4855,7 +4831,7 @@ public void onDynamicCacheChangeFail(final ClusterNode node, final DynamicCacheC } if (actions != null) - actions.completeRequestFutures(cctx, msg.error()); + actions.completeRequestFutures(cctx, msg.createFailureCompoundException()); onDone(exchId.topologyVersion()); } @@ -5640,6 +5616,12 @@ private static class FinishState { this.crdId = crdId; this.resTopVer = resTopVer; this.msg = msg; + + } + + /** */ + public boolean isCompleted() { + return msg != null; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java index 73e771e6543b1..8384ecbde6e51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -167,6 +167,20 @@ private DiscoveryDataClusterState( this.prevClusterState = prevClusterState; } + /** */ + public boolean isBaselineChangeInProgress() { + if (!transition()) + return false; + + if (previouslyActive() == state().active()) + return true; + + // Or it is the first activation. + return state() != ClusterState.INACTIVE + && !previouslyActive() + && previousBaselineTopology() == null; + } + /** * @return Cluster state before transition if cluster in transition and current cluster state otherwise. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 5d31952ca9490..7ad0593b6c128 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -42,7 +42,10 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.BaselineChangedEvent; import org.apache.ignite.events.BaselineConfigurationChangedEvent; +import org.apache.ignite.events.ClusterActivationEvent; +import org.apache.ignite.events.ClusterStateChangeEvent; import org.apache.ignite.events.ClusterStateChangeStartedEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -54,6 +57,7 @@ import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration; import org.apache.ignite.internal.cluster.IgniteClusterImpl; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.systemview.walker.BaselineNodeAttributeViewWalker; import org.apache.ignite.internal.managers.systemview.walker.BaselineNodeViewWalker; @@ -105,6 +109,9 @@ import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_STATE_ON_START; import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED; import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED; +import static org.apache.ignite.events.EventType.EVT_CLUSTER_ACTIVATED; +import static org.apache.ignite.events.EventType.EVT_CLUSTER_DEACTIVATED; +import static org.apache.ignite.events.EventType.EVT_CLUSTER_STATE_CHANGED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -773,7 +780,8 @@ else if (isApplicable(msg, state)) { msg, bltHistItem, state.state(), - stateChangeTopVer + stateChangeTopVer, + msg.forceChangeBaselineTopology() || globalState.isBaselineChangeInProgress() ); exchangeActions.stateChangeRequest(req); @@ -1436,7 +1444,11 @@ private void onFinalActivate(final StateChangeRequest req) { } /** {@inheritDoc} */ - @Override public void onStateChangeExchangeDone(StateChangeRequest req) { + @Override public void onStateChangeExchangeDone(ExchangeActions actions) { + StateChangeRequest req = actions.stateChangeRequest(); + + assert req != null; + try { if (req.activeChanged()) { if (req.state().active()) @@ -1455,6 +1467,8 @@ private void onFinalActivate(final StateChangeRequest req) { sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e); } + + onClusterStateChangeFinish(actions, req.isBaselineChangeRequest()); } /** {@inheritDoc} */ @@ -1648,7 +1662,8 @@ public ExchangeActions autoAdjustExchangeActions(ExchangeActions exchActs) { msg, BaselineTopologyHistoryItem.fromBaseline(blt), msg.state(), - null + null, + false ); if (exchActs == null) @@ -1994,6 +2009,71 @@ private List listInMemoryUserCaches() { .collect(Collectors.toList()); } + /** */ + private void onClusterStateChangeFinish(ExchangeActions exchActions, boolean baselineChanging) { + A.notNull(exchActions, "exchActions"); + + GridEventStorageManager evtMngr = ctx.event(); + + if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED) || + exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED) || + exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED) + ) { + List evts = new ArrayList<>(2); + + ClusterNode locNode = ctx.discovery().localNode(); + + Collection bltNodes = ctx.cluster().get().currentBaselineTopology(); + + boolean collectionUsed = false; + + if (exchActions.activate() && evtMngr.isRecordable(EVT_CLUSTER_ACTIVATED)) { + assert !exchActions.deactivate() : exchActions; + + collectionUsed = true; + + evts.add(new ClusterActivationEvent(locNode, "Cluster activated.", EVT_CLUSTER_ACTIVATED, bltNodes)); + } + + if (exchActions.deactivate() && evtMngr.isRecordable(EVT_CLUSTER_DEACTIVATED)) { + assert !exchActions.activate() : exchActions; + + collectionUsed = true; + + evts.add(new ClusterActivationEvent(locNode, "Cluster deactivated.", EVT_CLUSTER_DEACTIVATED, bltNodes)); + } + + if (exchActions.changedClusterState() && evtMngr.isRecordable(EVT_CLUSTER_STATE_CHANGED)) { + StateChangeRequest req = exchActions.stateChangeRequest(); + + if (collectionUsed && bltNodes != null) + bltNodes = new ArrayList<>(bltNodes); + + evts.add(new ClusterStateChangeEvent(req.prevState(), req.state(), bltNodes, locNode, "Cluster state changed.")); + } + + A.notEmpty(evts, "events " + exchActions); + + ctx.pools().getSystemExecutorService() + .submit(() -> evts.forEach(e -> ctx.event().record(e))); + } + + if (baselineChanging) { + ctx.pools().getStripedExecutorService().execute(new Runnable() { + @Override public void run() { + if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) { + ctx.event().record(new BaselineChangedEvent( + ctx.discovery().localNode(), + "Baseline changed.", + EventType.EVT_BASELINE_CHANGED, + ctx.cluster().get().currentBaselineTopology() + )); + } + } + }); + } + } + /** * */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java index 74cfd508c90a3..f4f3d622096bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.GridProcessor; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -130,9 +131,9 @@ IgniteInternalFuture changeGlobalState( void onStateChangeError(Map errs, StateChangeRequest req); /** - * @param req State change request. + * @param exchangeActions Exchange actions. */ - void onStateChangeExchangeDone(StateChangeRequest req); + void onStateChangeExchangeDone(ExchangeActions exchangeActions); /** * @param blt New baseline topology. diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index fc27ad5fe8322..295c5422f93ed 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -371,7 +371,6 @@ org.apache.ignite.internal.commandline.cache.reset_lost_partitions.CacheResetLos org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException org.apache.ignite.internal.direct.DirectMessageReader$1 -org.apache.ignite.internal.direct.DirectMessageWriter$1 org.apache.ignite.internal.dto.IgniteDataTransferObject org.apache.ignite.internal.events.DiscoveryCustomEvent org.apache.ignite.internal.events.ManagementTaskEvent @@ -443,6 +442,8 @@ org.apache.ignite.internal.management.cache.CacheDistributionCommandArg org.apache.ignite.internal.management.cache.CacheEvictionConfiguration org.apache.ignite.internal.management.cache.CacheFilterEnum org.apache.ignite.internal.management.cache.CacheFindGarbageCommandArg +org.apache.ignite.internal.management.cache.CacheIdleVerifyCancelTask +org.apache.ignite.internal.management.cache.CacheIdleVerifyCancelTask$CacheIdleVerifyCancelJob org.apache.ignite.internal.management.cache.CacheIdleVerifyCommandArg org.apache.ignite.internal.management.cache.CacheIdleVerifyDumpCommandArg org.apache.ignite.internal.management.cache.CacheIndexesForceRebuildCommandArg @@ -618,7 +619,6 @@ org.apache.ignite.internal.management.meta.MetadataInfoTask$MetadataListJob org.apache.ignite.internal.management.meta.MetadataListResult org.apache.ignite.internal.management.meta.MetadataMarshalled org.apache.ignite.internal.management.meta.MetadataRemoveTask -org.apache.ignite.internal.management.meta.MetadataRemoveTask$DropAllThinSessionsJob org.apache.ignite.internal.management.meta.MetadataRemoveTask$MetadataRemoveJob org.apache.ignite.internal.management.meta.MetadataUpdateTask org.apache.ignite.internal.management.meta.MetadataUpdateTask$MetadataUpdateJob @@ -856,11 +856,11 @@ org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage org.apache.ignite.internal.processors.cache.ClusterCachesInfo$1$1 org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch -org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy org.apache.ignite.internal.processors.cache.ExchangeActions$1 org.apache.ignite.internal.processors.cache.ExchangeActions$2 +org.apache.ignite.internal.processors.cache.ExchangeFailureMessage org.apache.ignite.internal.processors.cache.FetchActiveTxOwnerTraceClosure org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy org.apache.ignite.internal.processors.cache.GridCacheAdapter @@ -889,6 +889,7 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$AsyncOpRetryFuture$ org.apache.ignite.internal.processors.cache.GridCacheAdapter$AsyncOpRetryFuture$1$1 org.apache.ignite.internal.processors.cache.GridCacheAdapter$AtomicReadRepairEntryProcessor org.apache.ignite.internal.processors.cache.GridCacheAdapter$BulkOperation +org.apache.ignite.internal.processors.cache.GridCacheAdapter$CX1ContextAware org.apache.ignite.internal.processors.cache.GridCacheAdapter$ClearTask org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllNearJob @@ -910,20 +911,21 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$TopologyVersionAwar org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAllTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAndRemoveTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure -org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllConflictTimeStatClosure +org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure -org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllConflictTimeStatClosure +org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAttributes org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl$1 org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl$2 org.apache.ignite.internal.processors.cache.GridCacheContext +org.apache.ignite.internal.processors.cache.GridCacheContext$3 org.apache.ignite.internal.processors.cache.GridCacheContext$4 -org.apache.ignite.internal.processors.cache.GridCacheContext$5 +org.apache.ignite.internal.processors.cache.GridCacheContext$SessionContextProviderImpl org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper$1 org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager$3 @@ -959,8 +961,8 @@ org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$2$ org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3 org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$4 org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$5 -org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$7 -org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$9 +org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$6 +org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$8 org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeFutureSet org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker$1 org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler @@ -1113,6 +1115,8 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPar org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage +org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.AtomicApplicationAttributesAwareRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$10 @@ -1137,6 +1141,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomic org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$28 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$29 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$3 +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$30 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$4 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6 @@ -1150,13 +1155,16 @@ org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomic org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture$DhtLeftResult +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture$UpdateReplyClosureContextAware org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors @@ -1182,8 +1190,8 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage +org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$3 @@ -1216,6 +1224,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInva org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition$2 org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation$1 +org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager$1 org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager$EvictReason org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache @@ -1361,7 +1370,10 @@ org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIter org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext$1 org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIterator org.apache.ignite.internal.processors.cache.persistence.wal.reader.StrictBoundsCheckException +org.apache.ignite.internal.processors.cache.query.AbstractScanQueryIterator org.apache.ignite.internal.processors.cache.query.CacheQuery$1 +org.apache.ignite.internal.processors.cache.query.CacheQuery$2 +org.apache.ignite.internal.processors.cache.query.CacheQuery$3 org.apache.ignite.internal.processors.cache.query.CacheQuery$ScanQueryFallbackClosableIterator org.apache.ignite.internal.processors.cache.query.CacheQueryEntry org.apache.ignite.internal.processors.cache.query.CacheQueryType @@ -1656,6 +1668,7 @@ org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaSto org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion +org.apache.ignite.internal.processors.odbc.ClientConnectionNodeRecoveryException org.apache.ignite.internal.processors.odbc.ClientMessage org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequestHandler$1 org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterActivationFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterActivationFailureTest.java new file mode 100644 index 0000000000000..0ee7698e694e1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterActivationFailureTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.INACTIVE; + +/** */ +@RunWith(Parameterized.class) +public class ClusterActivationFailureTest extends GridCommonAbstractTest { + /** */ + private static final int TEST_NODES_CNT = 3; + + /** */ + private final TestPluginProvider plugin = new TestPluginProvider(); + + /** */ + @Parameterized.Parameter() + public int activationInitiatorIdx; + + /** */ + @Parameterized.Parameters(name = "activationInitiatorIdx={0}") + public static Iterable data() { + Collection data = new ArrayList<>(); + + for (int activationInitiatorIdx = 0; activationInitiatorIdx < TEST_NODES_CNT; activationInitiatorIdx++) + data.add(new Object[] {activationInitiatorIdx}); + + return data; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setClusterStateOnStart(INACTIVE) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + .setPluginProviders(plugin); + } + + /** */ + @Test + public void testErrorOnActivation() throws Exception { + startGrids(TEST_NODES_CNT); + + plugin.markActivationBroken(true); + + try { + grid(activationInitiatorIdx).cluster().state(ACTIVE); + + fail(); + } + catch (IgniteException e) { + X.hasCause(e, "expected activation exception", IgniteCheckedException.class); + } + + Ignite cli = startClientGrid(TEST_NODES_CNT); + + assertEquals(INACTIVE, cli.cluster().state()); + + plugin.markActivationBroken(false); + + grid(activationInitiatorIdx).cluster().state(ACTIVE); + assertEquals(ACTIVE, cli.cluster().state()); + + cli.cache(DEFAULT_CACHE_NAME).put(0, 0); + assertEquals(0, cli.cache(DEFAULT_CACHE_NAME).get(0)); + + cli.cluster().state(INACTIVE); + assertEquals(INACTIVE, grid(activationInitiatorIdx).cluster().state()); + cli.cluster().state(ACTIVE); + assertEquals(ACTIVE, grid(activationInitiatorIdx).cluster().state()); + } + + /** */ + private static final class TestPluginProvider extends AbstractTestPluginProvider implements IgniteChangeGlobalStateSupport { + /** */ + private volatile boolean isActivationBroken; + + /** */ + public void markActivationBroken(boolean isActivationBroken) { + this.isActivationBroken = isActivationBroken; + } + + /** {@inheritDoc} */ + @Override public String name() { + return "test-plugin"; + } + + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext ignored) throws IgniteCheckedException { + if (isActivationBroken) + throw new IgniteCheckedException("expected activation exception"); + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext ignored) { + // No-op. + } + } +} + diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index a05aa7f33aa69..ec0a84ab3578f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; +import org.apache.ignite.internal.processors.cache.ClusterActivationFailureTest; import org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTest; import org.apache.ignite.internal.processors.cache.ClusterStateClientPartitionedSelfTest; import org.apache.ignite.internal.processors.cache.ClusterStateClientReplicatedSelfTest; @@ -103,6 +104,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, ClusterStateThinClientReplicatedSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ClusterStateNoRebalanceReplicatedTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ClusterReadOnlyModeTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, ClusterActivationFailureTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheCreateDestroyClusterReadOnlyModeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCachePartitionLossPolicySelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCacheGroupsPartitionLossPolicySelfTest.class, ignoredTests);