diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java index 98ce8275f2111..d5ccf319bcaaf 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java @@ -34,6 +34,8 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; @@ -57,6 +59,25 @@ /** */ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest { + /** */ + private boolean persistence = true; + + /** */ + private CacheConfiguration[] cacheConfigurations; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + for (DataRegionConfiguration drCfg : cfg.getDataStorageConfiguration().getDataRegionConfigurations()) + drCfg.setPersistenceEnabled(persistence); + + if (!F.isEmpty(cacheConfigurations)) + cfg.setCacheConfiguration(cacheConfigurations); + + return cfg; + } + /** * Creates table with two columns, where the first column is PK, * and verifies created cache. @@ -995,6 +1016,51 @@ public void testPrimaryKeyInlineSize() { "MY_TABLE", 5 + 3); } + /** */ + @Test + public void testNonPersistentRejoinsWithDynamicTablesOverPredefinedCaches() throws Exception { + stopAllGrids(); + + try { + persistence = false; + + CacheConfiguration cacheCfg = new CacheConfiguration<>("TEST_CACHE") + .setBackups(1) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + + cacheConfigurations = new CacheConfiguration[] {cacheCfg}; + + client = startGrids(3); + + sql("CREATE TABLE IF NOT EXISTS TEST_TBL(ID INTEGER PRIMARY KEY, VAL VARCHAR) WITH \"CACHE_NAME=TEST_CACHE\""); + + assertEquals(0, sql("SELECT * FROM TEST_TBL").size()); + + int testGrid = 2; + + stopGrid(testGrid); + startGrid(testGrid); + + awaitPartitionMapExchange(); + + for (int i = 0; i < 100; ++i) + assertEquals(1, sql("INSERT INTO TEST_TBL VALUES(" + (i + 1) + ", '" + (i + 1000) + "')").size()); + + assertEquals(100, grid(testGrid).cache("TEST_CACHE").size()); + + assertEquals(100, sql("SELECT * FROM TEST_TBL").size()); + } + finally { + persistence = true; + + afterTestsStopped(); + + beforeTestsStarted(); + } + } + /** */ private void checkPkInlineSize(String ddl, String tableName, int expectedSize) { sql(ddl); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index a2c04e8e5cab8..f3d1a0dd7780b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import javax.cache.configuration.CacheEntryListenerConfiguration; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -1616,37 +1617,35 @@ else if (!GridFunc.eqNotOrdered(desc.schema().entities(), locQryEntities)) cfg.getNearConfiguration() != null); } - updateRegisteredCachesIfNeeded(patchesToApply, cachesToSave, hasSchemaPatchConflict); + if (!hasSchemaPatchConflict) + updateRegisteredCaches(patchesToApply, cachesToSave); } /** - * Merging config or resaving it if it needed. + * Merging config, resaving it if it needed. * * @param patchesToApply Patches which need to apply. * @param cachesToSave Caches which need to resave. - * @param hasSchemaPatchConflict {@code true} if we have conflict during making patch. - */ - private void updateRegisteredCachesIfNeeded(Map patchesToApply, - Collection cachesToSave, boolean hasSchemaPatchConflict) { - //Skip merge of config if least one conflict was found. - if (!hasSchemaPatchConflict) { - boolean isClusterActive = ctx.state().clusterState().active(); - - //Merge of config for cluster only for inactive grid. - if (!isClusterActive && !patchesToApply.isEmpty()) { - for (Map.Entry entry : patchesToApply.entrySet()) { - if (entry.getKey().applySchemaPatch(entry.getValue())) - saveCacheConfiguration(entry.getKey()); - } + */ + private void updateRegisteredCaches( + Map patchesToApply, + Collection cachesToSave + ) { + // Store config only if cluster is nactive. + boolean isClusterActive = ctx.state().clusterState().active(); - for (DynamicCacheDescriptor descriptor : cachesToSave) - saveCacheConfiguration(descriptor); - } - else if (patchesToApply.isEmpty()) { - for (DynamicCacheDescriptor descriptor : cachesToSave) - saveCacheConfiguration(descriptor); + //Merge of config for cluster only for inactive grid. + if (!patchesToApply.isEmpty()) { + for (Map.Entry entry : patchesToApply.entrySet()) { + if (entry.getKey().applySchemaPatch(entry.getValue()) && !isClusterActive) + saveCacheConfiguration(entry.getKey()); } } + + if (isClusterActive) { + for (DynamicCacheDescriptor descriptor : cachesToSave) + saveCacheConfiguration(descriptor); + } } /** @@ -1840,7 +1839,7 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) { nearCfg = locCfg.cacheData().config().getNearConfiguration(); DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, - locCfg.cacheData().config(), + mergeConfigurations(locCfg.cacheData().config(), cfg), desc.cacheType(), desc.groupDescriptor(), desc.template(), @@ -1881,6 +1880,23 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) { } } + /** + * Merges local and received cache configurations. + * + * @param loc Local cache configuration. + * @param received Cache configuration received from the cluster. + * @see #registerReceivedCaches + * @see #updateRegisteredCaches + * @see DynamicCacheDescriptor#makeSchemaPatch(Collection) + * @see CacheConfiguration#writeReplace() + */ + private CacheConfiguration mergeConfigurations(CacheConfiguration loc, CacheConfiguration received) { + for (CacheEntryListenerConfiguration lsnrCfg : loc.getCacheEntryListenerConfigurations()) + received.addCacheEntryListenerConfiguration(lsnrCfg); + + return received; + } + /** * @param msg Message. */