Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite-24212 : non persistent sql schema rejoin v1 - add loc. lisnrs. #11818

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
Expand All @@ -57,6 +59,25 @@

/** */
public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
/** */
private boolean persistence = true;

/** */
private CacheConfiguration<?, ?>[] cacheConfigurations;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

for (DataRegionConfiguration drCfg : cfg.getDataStorageConfiguration().getDataRegionConfigurations())
drCfg.setPersistenceEnabled(persistence);

if (!F.isEmpty(cacheConfigurations))
cfg.setCacheConfiguration(cacheConfigurations);

return cfg;
}

/**
* Creates table with two columns, where the first column is PK,
* and verifies created cache.
Expand Down Expand Up @@ -995,6 +1016,51 @@ public void testPrimaryKeyInlineSize() {
"MY_TABLE", 5 + 3);
}

/** */
@Test
public void testNonPersistentRejoinsWithDynamicTablesOverPredefinedCaches() throws Exception {
stopAllGrids();

try {
persistence = false;

CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>("TEST_CACHE")
.setBackups(1)
.setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);

cacheConfigurations = new CacheConfiguration<?, ?>[] {cacheCfg};

client = startGrids(3);

sql("CREATE TABLE IF NOT EXISTS TEST_TBL(ID INTEGER PRIMARY KEY, VAL VARCHAR) WITH \"CACHE_NAME=TEST_CACHE\"");

assertEquals(0, sql("SELECT * FROM TEST_TBL").size());

int testGrid = 2;

stopGrid(testGrid);
startGrid(testGrid);

awaitPartitionMapExchange();

for (int i = 0; i < 100; ++i)
assertEquals(1, sql("INSERT INTO TEST_TBL VALUES(" + (i + 1) + ", '" + (i + 1000) + "')").size());

assertEquals(100, grid(testGrid).cache("TEST_CACHE").size());

assertEquals(100, sql("SELECT * FROM TEST_TBL").size());
}
finally {
persistence = true;

afterTestsStopped();

beforeTestsStarted();
}
}

/** */
private void checkPkInlineSize(String ddl, String tableName, int expectedSize) {
sql(ddl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
Expand Down Expand Up @@ -1616,37 +1617,35 @@ else if (!GridFunc.eqNotOrdered(desc.schema().entities(), locQryEntities))
cfg.getNearConfiguration() != null);
}

updateRegisteredCachesIfNeeded(patchesToApply, cachesToSave, hasSchemaPatchConflict);
if (!hasSchemaPatchConflict)
updateRegisteredCaches(patchesToApply, cachesToSave);
}

/**
* Merging config or resaving it if it needed.
* Merging config, resaving it if it needed.
*
* @param patchesToApply Patches which need to apply.
* @param cachesToSave Caches which need to resave.
* @param hasSchemaPatchConflict {@code true} if we have conflict during making patch.
*/
private void updateRegisteredCachesIfNeeded(Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply,
Collection<DynamicCacheDescriptor> cachesToSave, boolean hasSchemaPatchConflict) {
//Skip merge of config if least one conflict was found.
if (!hasSchemaPatchConflict) {
boolean isClusterActive = ctx.state().clusterState().active();

//Merge of config for cluster only for inactive grid.
if (!isClusterActive && !patchesToApply.isEmpty()) {
for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
if (entry.getKey().applySchemaPatch(entry.getValue()))
saveCacheConfiguration(entry.getKey());
}
*/
private void updateRegisteredCaches(
Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply,
Collection<DynamicCacheDescriptor> cachesToSave
) {
// Store config only if cluster is nactive.
boolean isClusterActive = ctx.state().clusterState().active();

for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
}
else if (patchesToApply.isEmpty()) {
for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
//Merge of config for cluster only for inactive grid.
if (!patchesToApply.isEmpty()) {
for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
if (entry.getKey().applySchemaPatch(entry.getValue()) && !isClusterActive)
saveCacheConfiguration(entry.getKey());
}
}

if (isClusterActive) {
for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
}
}

/**
Expand Down Expand Up @@ -1840,7 +1839,7 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) {
nearCfg = locCfg.cacheData().config().getNearConfiguration();

DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
locCfg.cacheData().config(),
mergeConfigurations(locCfg.cacheData().config(), cfg),
desc.cacheType(),
desc.groupDescriptor(),
desc.template(),
Expand Down Expand Up @@ -1881,6 +1880,23 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) {
}
}

/**
* Merges local and received cache configurations.
*
* @param loc Local cache configuration.
* @param received Cache configuration received from the cluster.
* @see #registerReceivedCaches
* @see #updateRegisteredCaches
* @see DynamicCacheDescriptor#makeSchemaPatch(Collection)
* @see CacheConfiguration#writeReplace()
*/
private CacheConfiguration<?, ?> mergeConfigurations(CacheConfiguration<?, ?> loc, CacheConfiguration<?, ?> received) {
for (CacheEntryListenerConfiguration lsnrCfg : loc.getCacheEntryListenerConfigurations())
received.addCacheEntryListenerConfiguration(lsnrCfg);

return received;
}

/**
* @param msg Message.
*/
Expand Down