Skip to content

Commit

Permalink
IGNITE-22767 Fix index scan
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Jul 30, 2024
1 parent 1276223 commit f87e1a9
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,11 @@ else if (checkExpired)
if (!F.isEmpty(ectx.getTxWriteEntries())) {
BPlusTree.TreeRowClosure<IndexRow, IndexRow> _rowFilter = rowFilter;

int[] parts = grp.partitions(ectx.localNodeId());

IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = transactionRows(
ectx.getTxWriteEntries(),
e -> true,
e -> F.contains(parts, e.key().partition()),
Function.identity()
);

Expand All @@ -225,12 +227,12 @@ else if (checkExpired)
long pageAddr,
int idx
) throws IgniteCheckedException {
if (!_rowFilter.apply(tree, io, pageAddr, idx))
if (_rowFilter != null && !_rowFilter.apply(tree, io, pageAddr, idx))
return false;

IndexRow row = tree.getRow(io, pageAddr, idx);

return txChanges.get1().remove(row.cacheDataRow().key());
return !txChanges.get1().remove(row.cacheDataRow().key());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
Expand All @@ -46,7 +48,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

/** */
Expand All @@ -71,6 +72,12 @@ public class TransactionIsolationTest extends GridCommonAbstractTest {
/** */
public static final User KYLE = new User(3, "Kyle Reese");

/** */
public static final int TX_TIMEOUT = 60_000;

/** */
public static final int TX_SIZE = 10;

/** */
@Parameterized.Parameter()
public String insert;
Expand Down Expand Up @@ -123,7 +130,7 @@ public static Collection<?> parameters() {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();

srv = startGrid();
srv = startGrids(3);
cli = startClientGrid("client");

LinkedHashMap<String, String> flds = new LinkedHashMap<>();
Expand All @@ -140,7 +147,19 @@ public static Collection<?> parameters() {
.setKeyType(Integer.class.getName())
.setValueType(User.class.getName())
.setKeyFieldName("id")
.setFields(flds))));
.setFields(flds)
.setIndexes(Collections.singleton(new QueryIndex()
.setName("IDX_FIO_USERS")
.setFieldNames(Collections.singleton("fio"), true).setInlineSize(Character.BYTES * 20))))));

cli.createCache(new CacheConfiguration<Integer, Integer>()
.setName("TBL")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setQueryEntities(Collections.singleton(new QueryEntity()
.setTableName("TBL")
.setKeyType(Long.class.getName())
.setValueType(Long.class.getName()))));

}

/** {@inheritDoc} */
Expand All @@ -163,6 +182,88 @@ public static Collection<?> parameters() {
cli.cache(USERS).removeAll();
}

/** */
@Test
public void testIndexScan() {
delete(1);

assertEquals("Table must be empty", 0L, executeSql(cli, "SELECT COUNT(*) FROM USERS.USERS").get(0).get(0));

for (int i = 0; i < 5; i++) {
int start = i * 10;

for (int j = 0; j < 5; j++) {
int id = start + j + 1;

log.info("id = " + id);

insert(F.t(id, new User(id, "User" + j))); // Intentionally repeat FIO to make same indexed keys.
}
}

assertEquals(25L, executeSql(cli, "SELECT COUNT(*) FROM USERS.USERS").get(0).get(0));

try (Transaction tx = cli.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT, TX_SIZE)) {
for (int i = 0; i < 5; i++) {
int start = i * 10 + 5;

for (int j = 0; j < 5; j++) {
int id = start + j + 1;

log.info("id = " + id);

insert(F.t(id, new User(id, "User" + j))); // Intentionally repeat FIO to make same indexed keys.

long expTblSz = 25L + i * 5 + j + 1;

assertEquals(expTblSz, executeSql(cli, "SELECT COUNT(*) FROM USERS.USERS").get(0).get(0));

List<List<?>> rows = executeSql(cli, "SELECT fio FROM USERS.USERS ORDER BY fio");

assertEquals(expTblSz, rows.size());

ensureSorted(rows, true);
}
}

for (int i = 0; i < 5; i++) {
int start = i * 10;

for (int j = 0; j < 5; j++) {
int id = start + j + 1;

log.info("id = " + id);

delete(id);

long expTblSz = 50L - (i * 5 + j + 1);

assertEquals(expTblSz, executeSql(cli, "SELECT COUNT(*) FROM USERS.USERS").get(0).get(0));

List<List<?>> rows = executeSql(cli, "SELECT fio FROM USERS.USERS ORDER BY fio DESC");

assertEquals(expTblSz, rows.size());

ensureSorted(rows, false);
}
}

tx.commit();
}

assertEquals(25L, executeSql(srv, "SELECT COUNT(*) FROM USERS.USERS").get(0).get(0));
}

/** */
private static void ensureSorted(List<List<?>> rows, boolean asc) {
for (int k = 1; k < rows.size(); k++) {
String fio0 = (String)rows.get(k - 1).get(0);
String fio1 = (String)rows.get(k).get(0);

assertTrue(asc ? (fio0.compareTo(fio1) <= 0) : (fio0.compareTo(fio1) >= 0));
}
}

/** */
@Test
public void testInsert() {
Expand Down Expand Up @@ -222,40 +323,36 @@ public void testDelete() {
/** */
@Test
public void testVisibility() {
IgniteCache<Integer, Integer> cache = srv.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
.setName("TBL")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setQueryEntities(Collections.singleton(new QueryEntity()
.setTableName("TBL")
.setKeyType(Integer.class.getName())
.setValueType(Integer.class.getName()))));

executeSql(srv, "DELETE FROM TBL.TBL");

try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 5_000, 10)) {
List<List<?>> sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");
IgniteCache<Long, Long> cache = cli.cache("TBL");

assertEquals("Table must be empty", 0L, sqlData.get(0).get(0));
assertEquals("Table must be empty", 0L, executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL").get(0).get(0));

cache.put(1, 2);
long cnt = 100;

assertEquals("Must see transaction related data", (Integer)2, cache.get(1));
LongStream.range(1, 1 + cnt).forEach(i -> {
try (Transaction tx = srv.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT, TX_SIZE)) {
cache.put(i, i + 1);

sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");
assertEquals("Must see transaction related data", (Long)(i + 1), cache.get(i));

assertEquals("Must see transaction related data", 1L, sqlData.get(0).get(0));
List<List<?>> sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");

tx.commit();
}
assertEquals("Must count properly", i, sqlData.get(0).get(0));

tx.commit();
}
});

List<List<?>> sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");

assertEquals("Must see committed data", 1L, sqlData.get(0).get(0));
assertEquals("Must see committed data", cnt, sqlData.get(0).get(0));
}

/** */
private void insideRollbackedTx(RunnableX test) {
try (Transaction tx = cli.transactions().txStart(txConcurrency, txIsolation, 1_000, 10)) {
try (Transaction tx = cli.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT, TX_SIZE)) {
test.run();

tx.rollback();
Expand Down

0 comments on commit f87e1a9

Please sign in to comment.