From 03ae2acdd55df1489810f527773116acef75539f Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Fri, 2 Aug 2024 13:34:13 +0300 Subject: [PATCH] IGNITE-22767 Fix index scan --- .../calcite/exec/ExecutionServiceImpl.java | 1 - .../query/calcite/exec/IndexScan.java | 2 +- .../tx/TransactionIsolationTest.java | 210 +++++++++++------- 3 files changed, 134 insertions(+), 79 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index c88aa53ca2045..7458633445287 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -660,7 +660,6 @@ private ListFieldsQueryCursor mapAndExecutePlan( qry.onResponse(nodeId, fragment.fragmentId(), ex); else { try { - // TODO: add tx state here. QueryStartRequest req = new QueryStartRequest( qry.id(), qry.localQueryId(), diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index 1854be2098db4..c0e2304f1d2b5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -255,7 +255,7 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { txChanges = transactionRows( ectx.getTxWriteEntries(), // TODO: Use set for partitions here. - e -> (cctx.cacheId() == e.cacheId()) && F.contains(parts, e.key().partition()), + e -> (cctx.cacheId() == e.cacheId()) && (parts == null || F.contains(parts, e.key().partition())), r -> new IndexRowImpl(rowHnd, r) ); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TransactionIsolationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TransactionIsolationTest.java index 53623ae794828..b25e72b447b57 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TransactionIsolationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TransactionIsolationTest.java @@ -29,6 +29,8 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -43,8 +45,10 @@ import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.client.thin.TcpIgniteClient; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -55,6 +59,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static java.lang.String.format; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.junit.Assume.assumeFalse; @@ -62,9 +67,6 @@ @RunWith(Parameterized.class) @WithSystemProperty(key = IgniteSystemProperties.IGNITE_ALLOW_TX_AWARE_QUERIES, value = "true") public class TransactionIsolationTest extends GridCommonAbstractTest { - /** */ - public static final String USERS = "USERS"; - /** */ public static final String CACHE = "cache"; @@ -106,6 +108,10 @@ public class TransactionIsolationTest extends GridCommonAbstractTest { @Parameterized.Parameter(4) public boolean partitionAwareness; + /** */ + @Parameterized.Parameter(5) + public CacheMode mode; + /** */ private static IgniteEx srv; @@ -122,7 +128,7 @@ public class TransactionIsolationTest extends GridCommonAbstractTest { private TransactionIsolation txIsolation = READ_COMMITTED; /** @return Test parameters. */ - @Parameterized.Parameters(name = "insert={0},update={1},delete={2},thin={3}") + @Parameterized.Parameters(name = "insert={0},update={1},delete={2},thin={3},partitionAwareness={4},mode={5}") public static Collection parameters() { List params = new ArrayList<>(); @@ -131,10 +137,14 @@ public static Collection parameters() { for (String insert : apis) { for (String update : apis) { for (String delete : apis) { - params.add(new Object[]{insert, update, delete, false, false}); + for (CacheMode mode : CacheMode.values()) { + params.add(new Object[]{insert, update, delete, false, false, mode}); - for (boolean partitionAwareness : new boolean[]{false, true}) { - params.add(new Object[]{insert, update, delete, true, partitionAwareness}); +/* + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{insert, update, delete, true, partitionAwareness, mode}); + } +*/ } } } @@ -159,43 +169,54 @@ public static Collection parameters() { .setAddresses(Config.SERVER) .setPartitionAwarenessEnabled(partitionAwareness)); - LinkedHashMap flds = new LinkedHashMap<>(); - - flds.put("id", Integer.class.getName()); - flds.put("userId", Integer.class.getName()); - flds.put("fio", String.class.getName()); - - cli.createCache(new CacheConfiguration<>() - .setName(USERS) - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) - .setQueryEntities(Collections.singleton(new QueryEntity() - .setTableName(USERS) - .setKeyType(Integer.class.getName()) - .setValueType(User.class.getName()) - .setKeyFieldName("id") - .setFields(flds) - .setIndexes(Arrays.asList( - new QueryIndex() - .setName("IDX_FIO_USERS") - .setFieldNames(Collections.singleton("fio"), true).setInlineSize(Character.BYTES * 20), - new QueryIndex() - .setName("IDX_USER_ID") - .setFieldNames(Collections.singleton("userId"), true) - ))))); - - cli.createCache(new CacheConfiguration() - .setName("TBL") - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) - .setQueryEntities(Collections.singleton(new QueryEntity() - .setTableName("TBL") - .setKeyType(Long.class.getName()) - .setValueType(Long.class.getName())))); - + for (CacheMode mode : CacheMode.values()) { + String users = tableName("USERS", mode); + String tbl = tableName("TBL", mode); + + LinkedHashMap flds = new LinkedHashMap<>(); + + flds.put("id", Integer.class.getName()); + flds.put("userId", Integer.class.getName()); + flds.put("fio", String.class.getName()); + + cli.createCache(new CacheConfiguration<>() + .setName(users) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(mode) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setSqlSchema(QueryUtils.DFLT_SCHEMA) + .setQueryEntities(Collections.singleton(new QueryEntity() + .setTableName(users) + .setKeyType(Integer.class.getName()) + .setValueType(User.class.getName()) + .setKeyFieldName("id") + .setFields(flds) + .setIndexes(Arrays.asList( + new QueryIndex() + .setName("IDX_FIO_" + users) + .setFieldNames(Collections.singleton("fio"), true).setInlineSize(Character.BYTES * 20), + new QueryIndex() + .setName("IDX_USERID_" + users) + .setFieldNames(Collections.singleton("userId"), true) + ))))); + + cli.createCache(new CacheConfiguration() + .setName(tbl) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(this.mode) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setSqlSchema(QueryUtils.DFLT_SCHEMA) + .setQueryEntities(Collections.singleton(new QueryEntity() + .setTableName(tbl) + .setKeyType(Long.class.getName()) + .setValueType(Long.class.getName())))); + } } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); + stopAllGrids(); } @@ -210,7 +231,7 @@ public static Collection parameters() { @Override protected void afterTest() throws Exception { super.afterTest(); - cli.cache(USERS).removeAll(); + cli.cache(users()).removeAll(); } /** */ @@ -220,7 +241,7 @@ public void testIndexScan() { delete(1); - assertEquals("Table must be empty", 0L, executeSql("SELECT COUNT(*) FROM USERS.USERS").get(0).get(0)); + assertEquals("Table must be empty", 0L, executeSql(format("SELECT COUNT(*) FROM %s", users())).get(0).get(0)); for (int i = 0; i < 5; i++) { int start = i * 10; @@ -232,13 +253,13 @@ public void testIndexScan() { } } - assertEquals(25L, executeSql("SELECT COUNT(*) FROM USERS.USERS").get(0).get(0)); + assertEquals(25L, executeSql(format("SELECT COUNT(*) FROM %s", users())).get(0).get(0)); insideTx(() -> { for (int i = 0; i < 5; i++) { int start = i * 10 + 5; - assertEquals(i * 10 + 1, executeSql("SELECT MIN(userid) FROM USERS.USERS WHERE userid > ?", i * 10).get(0).get(0)); + assertEquals(i * 10 + 1, executeSql(format("SELECT MIN(userid) FROM %s WHERE userid > ?", users()), i * 10).get(0).get(0)); for (int j = 0; j < 5; j++) { int id = start + j + 1; @@ -247,9 +268,9 @@ public void testIndexScan() { long expTblSz = 25L + i * 5 + j + 1; - assertEquals(expTblSz, executeSql("SELECT COUNT(*) FROM USERS.USERS").get(0).get(0)); + assertEquals(expTblSz, executeSql(format("SELECT COUNT(*) FROM %s", users())).get(0).get(0)); - List> rows = executeSql("SELECT fio FROM USERS.USERS ORDER BY fio"); + List> rows = executeSql(format("SELECT fio FROM %s ORDER BY fio", users())); assertEquals(expTblSz, rows.size()); @@ -257,7 +278,7 @@ public void testIndexScan() { assertEquals( id, - executeSql("SELECT MIN(userid) FROM USERS.USERS WHERE userid BETWEEN ? AND ?", id, 500).get(0).get(0) + executeSql(format("SELECT MIN(userid) FROM %s WHERE userid BETWEEN ? AND ?", users()), id, 500).get(0).get(0) ); } } @@ -272,9 +293,9 @@ public void testIndexScan() { long expTblSz = 50L - (i * 5 + j + 1); - assertEquals(expTblSz, executeSql("SELECT COUNT(*) FROM USERS.USERS").get(0).get(0)); + assertEquals(expTblSz, executeSql(format("SELECT COUNT(*) FROM %s", users())).get(0).get(0)); - List> rows = executeSql("SELECT fio FROM USERS.USERS ORDER BY fio DESC"); + List> rows = executeSql(format("SELECT fio FROM %s ORDER BY fio DESC", users())); assertEquals(expTblSz, rows.size()); @@ -283,7 +304,7 @@ public void testIndexScan() { } }, true); - assertEquals(25L, executeSql("SELECT COUNT(*) FROM USERS.USERS").get(0).get(0)); + assertEquals(25L, executeSql(format("SELECT COUNT(*) FROM %s", users())).get(0).get(0)); } /** */ @@ -301,9 +322,15 @@ private static void ensureSorted(List> rows, boolean asc) { public void testInsert() { assumeFalse("https://issues.apache.org/jira/browse/IGNITE-22874", thin); - insideTx(() -> { + Runnable checkDflts = () -> { assertNull(CACHE, select(4, CACHE)); assertNull(SQL, select(4, SQL)); + }; + + checkDflts.run(); + + insideTx(() -> { + checkDflts.run(); insert(F.t(4, JOHN)); @@ -311,8 +338,7 @@ public void testInsert() { assertEquals(SQL, JOHN, select(4, SQL)); }, false); - assertNull(CACHE, select(4, CACHE)); - assertNull(SQL, select(4, SQL)); + checkDflts.run(); } /** */ @@ -320,9 +346,15 @@ public void testInsert() { public void testUpdate() { assumeFalse("https://issues.apache.org/jira/browse/IGNITE-22874", thin); - insideTx(() -> { + Runnable checkDflts = () -> { assertEquals(JOHN, select(1, CACHE)); assertEquals(JOHN, select(1, SQL)); + }; + + checkDflts.run(); + + insideTx(() -> { + checkDflts.run(); update(F.t(1, SARAH)); @@ -335,8 +367,7 @@ public void testUpdate() { assertEquals(KYLE, select(1, SQL)); }, false); - assertEquals(JOHN, select(1, CACHE)); - assertEquals(JOHN, select(1, SQL)); + checkDflts.run(); } /** */ @@ -344,9 +375,15 @@ public void testUpdate() { public void testDelete() { assumeFalse("https://issues.apache.org/jira/browse/IGNITE-22874", thin); - insideTx(() -> { + Runnable checkDflts = () -> { assertEquals(JOHN, select(1, CACHE)); assertEquals(JOHN, select(1, SQL)); + }; + + checkDflts.run(); + + insideTx(() -> { + checkDflts.run(); delete(1); @@ -354,8 +391,7 @@ public void testDelete() { assertNull(select(1, SQL)); }, false); - assertEquals(JOHN, select(1, CACHE)); - assertEquals(JOHN, select(1, SQL)); + checkDflts.run(); } /** */ @@ -363,12 +399,12 @@ public void testDelete() { public void testVisibility() { assumeFalse("https://issues.apache.org/jira/browse/IGNITE-22874", thin); - executeSql("DELETE FROM TBL.TBL"); + executeSql(format("DELETE FROM %s", tbl())); - IgniteCache cache = cli.cache("TBL"); - ClientCache thinCache = thinCli.cache("TBL"); + IgniteCache cache = cli.cache(tbl()); + ClientCache thinCache = thinCli.cache(tbl()); - assertEquals("Table must be empty", 0L, executeSql("SELECT COUNT(*) FROM TBL.TBL").get(0).get(0)); + assertEquals("Table must be empty", 0L, executeSql(format("SELECT COUNT(*) FROM %s", tbl())).get(0).get(0)); long cnt = 100; @@ -384,12 +420,12 @@ public void testVisibility() { assertEquals("Must see transaction related data", (Long)(i + 1), cache.get(i)); } - List> sqlData = executeSql("SELECT COUNT(*) FROM TBL.TBL"); + List> sqlData = executeSql(format("SELECT COUNT(*) FROM %s", tbl())); assertEquals("Must count properly", i, sqlData.get(0).get(0)); }, true)); - List> sqlData = executeSql("SELECT COUNT(*) FROM TBL.TBL"); + List> sqlData = executeSql(format("SELECT COUNT(*) FROM %s", tbl())); assertEquals("Must see committed data", cnt, sqlData.get(0).get(0)); } @@ -423,10 +459,10 @@ private void insideTx(RunnableX test, boolean commit) { private User select(Integer id, String api) { if (api.equals(CACHE)) return thin - ? (User)thinCli.cache(USERS).get(id) - : (User)cli.cache(USERS).get(id); + ? (User)thinCli.cache(users()).get(id) + : (User)cli.cache(users()).get(id); else if (api.equals(SQL)) { - List> res = executeSql("SELECT _VAL FROM USERS.USERS WHERE _KEY = ?", id); + List> res = executeSql(format("SELECT _VAL FROM %s WHERE _KEY = ?", users()), id); assertNotNull(res); @@ -442,12 +478,12 @@ else if (api.equals(SQL)) { private void insert(IgniteBiTuple data) { if (insert.equals(CACHE)) { if (thin) - thinCli.cache(USERS).put(data.get1(), data.get2()); + thinCli.cache(users()).put(data.get1(), data.get2()); else - cli.cache(USERS).put(data.get1(), data.get2()); + cli.cache(users()).put(data.get1(), data.get2()); } else if (insert.equals(SQL)) { - executeSql("INSERT INTO USERS.USERS(id, userid, fio) VALUES(?, ?, ?)", + executeSql(format("INSERT INTO %s(id, userid, fio) VALUES(?, ?, ?)", users()), data.get1(), data.get2().userId, data.get2().fio); @@ -460,12 +496,12 @@ else if (insert.equals(SQL)) { private void update(IgniteBiTuple data) { if (update.equals(CACHE)) { if (thin) - thinCli.cache(USERS).put(data.get1(), data.get2()); + thinCli.cache(users()).put(data.get1(), data.get2()); else - cli.cache(USERS).put(data.get1(), data.get2()); + cli.cache(users()).put(data.get1(), data.get2()); } else if (update.equals(SQL)) { - executeSql("UPDATE USERS.USERS SET userid = ?, fio = ? WHERE id = ?", + executeSql(format("UPDATE %s SET userid = ?, fio = ? WHERE id = ?", users()), data.get2().userId, data.get2().fio, data.get1()); @@ -477,9 +513,9 @@ else if (update.equals(SQL)) { /** */ private void delete(int id) { if (delete.equals(CACHE)) - cli.cache(USERS).remove(id); + cli.cache(users()).remove(id); else if (delete.equals(SQL)) - executeSql("DELETE FROM USERS.USERS WHERE id = ?", id); + executeSql(format("DELETE FROM %s WHERE id = ?", users()), id); else fail("Unknown delete: " + delete); } @@ -487,10 +523,10 @@ else if (delete.equals(SQL)) /** */ public static class User { /** */ - final int userId; + private final int userId; /** */ - final String fio; + private final String fio; /** */ public User(int id, String fio) { @@ -510,6 +546,11 @@ public User(int id, String fio) { @Override public int hashCode() { return Objects.hash(userId, fio); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(User.class, this); + } } /** */ @@ -530,4 +571,19 @@ public List> executeSql(String sqlText, Object... args) { ? thinCli.query(qry).getAll() : cli.cache(F.first(cli.cacheNames())).query(qry).getAll(); } + + /** */ + private String users() { + return tableName("USERS", mode); + } + + /** */ + private String tbl() { + return tableName("TBL", mode); + } + + /** */ + private static String tableName(String tbl, CacheMode mode) { + return tbl + "_" + mode; + } }