diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 0af10ada908d7..56dbd67f701bf 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -347,6 +347,9 @@
+
+
+
@@ -752,7 +755,7 @@
-
+
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 7f8ac3b8c9b4f..a21ba720ced04 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -58,7 +58,9 @@
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -120,6 +122,7 @@
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
/** */
@@ -486,6 +489,8 @@ private List parseAndProcessQuery(
String sql,
Object... params
) throws IgniteSQLException {
+ ensureTransactionModeSupported(qryCtx);
+
SchemaPlus schema = schemaHolder.schema(schemaName);
assert schema != null : "Schema not found: " + schemaName;
@@ -588,7 +593,32 @@ private Object contextKey(QueryContext qryCtx) {
SqlFieldsQuery sqlFieldsQry = qryCtx.unwrap(SqlFieldsQuery.class);
- return sqlFieldsQry != null ? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder()) : null;
+ return sqlFieldsQry != null
+ ? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder(), queryTransactionVersion(qryCtx) == null)
+ : null;
+ }
+
+ /** */
+ private static GridCacheVersion queryTransactionVersion(@Nullable QueryContext qryCtx) {
+ return qryCtx == null ? null : qryCtx.unwrap(GridCacheVersion.class);
+ }
+
+ /** */
+ private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) {
+ if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
+ return;
+
+ GridCacheVersion ver = queryTransactionVersion(qryCtx);
+
+ if (ver == null)
+ return;
+
+ final GridNearTxLocal userTx = ctx.cache().context().tm().tx(ver);
+
+ if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(userTx.isolation()))
+ return;
+
+ throw new IllegalStateException("Transaction isolation mode not supported for SQL queries: " + userTx.isolation());
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
index 6e87a7b3e6fb3..28b40966d94f7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
@@ -66,14 +66,14 @@ protected AbstractIndexScan(
/** {@inheritDoc} */
@Override public synchronized Iterator iterator() {
if (ranges == null)
- return new IteratorImpl(idx.find(null, null, true, true, indexQueryContext()));
+ return new IteratorImpl(indexCursor(null, null, true, true));
IgniteClosure, IteratorImpl> clo = range -> {
IdxRow lower = range.lower() == null ? null : row2indexRow(range.lower());
IdxRow upper = range.upper() == null ? null : row2indexRow(range.upper());
return new IteratorImpl(
- idx.find(lower, upper, range.lowerInclude(), range.upperInclude(), indexQueryContext()));
+ indexCursor(lower, upper, range.lowerInclude(), range.upperInclude()));
};
if (!ranges.multiBounds()) {
@@ -88,6 +88,11 @@ protected AbstractIndexScan(
return F.flat(F.iterator(ranges, clo, true));
}
+ /** */
+ protected GridCursor indexCursor(IdxRow lower, IdxRow upper, boolean lowerInclude, boolean upperInclude) {
+ return idx.find(lower, upper, lowerInclude, upperInclude, indexQueryContext());
+ }
+
/** */
protected abstract IdxRow row2indexRow(Row bound);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index b10b69f221305..616ef7143091d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -361,6 +361,7 @@ private ExecutionContext> baseInboxContext(UUID nodeId, UUID qryId, long fragm
NoOpMemoryTracker.INSTANCE,
NoOpIoTracker.INSTANCE,
0,
- ImmutableMap.of());
+ ImmutableMap.of(),
+ null);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 8627eb8e37cdf..2c8068e0316a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -18,24 +18,37 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
+import org.apache.ignite.internal.processors.query.calcite.message.QueryTxEntry;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractQueryContext;
@@ -45,8 +58,12 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.checkRange;
@@ -102,6 +119,9 @@ public class ExecutionContext extends AbstractQueryContext implements DataC
/** */
private final long timeout;
+ /** */
+ private final Collection qryTxEntries;
+
/** */
private final long startTs;
@@ -127,7 +147,8 @@ public ExecutionContext(
MemoryTracker qryMemoryTracker,
IoTracker ioTracker,
long timeout,
- Map params
+ Map params,
+ @Nullable Collection qryTxEntries
) {
super(qctx);
@@ -142,6 +163,7 @@ public ExecutionContext(
this.ioTracker = ioTracker;
this.params = params;
this.timeout = timeout;
+ this.qryTxEntries = qryTxEntries;
startTs = U.currentTimeMillis();
@@ -289,6 +311,84 @@ public void setCorrelated(@NotNull Object value, int id) {
correlations[id] = value;
}
+ /**
+ * @return Transaction write map.
+ */
+ public Collection getQryTxEntries() {
+ return qryTxEntries;
+ }
+
+ /** */
+ public static Collection transactionChanges(
+ Collection writeEntries
+ ) {
+ if (F.isEmpty(writeEntries))
+ return null;
+
+ Collection res = new ArrayList<>();
+
+ for (IgniteTxEntry e : writeEntries) {
+ CacheObject val = e.value();
+
+ if (!F.isEmpty(e.entryProcessors()))
+ val = e.applyEntryProcessors(val);
+
+ res.add(new QueryTxEntry(e.cacheId(), e.key(), val, e.explicitVersion()));
+ }
+
+ return res;
+ }
+
+ /**
+ * @param cacheId Cache id.
+ * @param parts Partitions set.
+ * @param mapper Mapper to specific data type.
+ * @return First, set of object changed in transaction, second, list of transaction data in required format.
+ * @param Required type.
+ */
+ public IgniteBiTuple, List> transactionChanges(
+ int cacheId,
+ int[] parts,
+ Function mapper
+ ) {
+ if (F.isEmpty(qryTxEntries))
+ return F.t(Collections.emptySet(), Collections.emptyList());
+
+ // Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small.
+ if (parts != null && !F.isSorted(parts))
+ Arrays.sort(parts);
+
+ Set changedKeys = new HashSet<>(qryTxEntries.size());
+ List newAndUpdatedRows = new ArrayList<>(qryTxEntries.size());
+
+ for (QueryTxEntry e : qryTxEntries) {
+ int part = e.key().partition();
+
+ assert part != -1;
+
+ if (e.cacheId() != cacheId)
+ continue;
+
+ if (parts != null && Arrays.binarySearch(parts, part) < 0)
+ continue;
+
+ changedKeys.add(e.key());
+
+ CacheObject val = e.value();
+
+ if (val != null) { // Mix only updated or inserted entries. In case val == null entry removed.
+ newAndUpdatedRows.add(mapper.apply(new CacheDataRowAdapter(
+ e.key(),
+ val,
+ e.version(),
+ CU.EXPIRE_TIME_ETERNAL // Expire time calculated on commit, can use eternal here.
+ )));
+ }
+ }
+
+ return F.t(changedKeys, newAndUpdatedRows);
+ }
+
/**
* Executes a query task.
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index bca13901aafd4..252358a870f52 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -47,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -608,6 +609,8 @@ private ListFieldsQueryCursor> mapAndExecutePlan(
MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+ final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());
+
ExecutionContext ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
@@ -620,7 +623,8 @@ private ListFieldsQueryCursor> mapAndExecutePlan(
qryMemoryTracker,
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
- qryParams);
+ qryParams,
+ userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));
Node node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
@@ -659,7 +663,8 @@ private ListFieldsQueryCursor> mapAndExecutePlan(
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled,
- timeout
+ timeout,
+ ectx.getQryTxEntries()
);
messageService().send(nodeId, req);
@@ -867,7 +872,8 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
createIoTracker(nodeId, msg.originatingQryId()),
msg.timeout(),
- Commons.parametersMap(msg.parameters())
+ Commons.parametersMap(msg.parameters()),
+ msg.queryTransactionEntries()
);
executeFragment(qry, (FragmentPlan)qryPlan, ectx);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index d69d7c413342f..e30974f50d9a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
@@ -31,15 +32,19 @@
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.SortedSegmentedIndexCursor;
import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InlineIO;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -54,6 +59,7 @@
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.Nullable;
@@ -98,6 +104,12 @@ public class IndexScan extends AbstractIndexScan {
/** Types of key fields stored in index. */
private final Type[] fieldsStoreTypes;
+ /**
+ * First, set of keys changed (inserted, updated or removed) inside transaction: must be skiped during index scan.
+ * Second, list of rows inserted or updated inside transaction: must be mixed with the scan results.
+ */
+ private final IgniteBiTuple, List> txChanges;
+
/**
* @param ectx Execution context.
* @param desc Table descriptor.
@@ -159,6 +171,21 @@ protected IndexScan(
fieldsStoreTypes[i] = typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());
fieldIdxMapping = fieldToInlinedKeysMapping(srcRowType.getFieldCount());
+
+ if (!F.isEmpty(ectx.getQryTxEntries())) {
+ InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
+
+ txChanges = ectx.transactionChanges(
+ cctx.cacheId(),
+ parts,
+ r -> new IndexRowImpl(rowHnd, r)
+ );
+
+ txChanges.get2().sort(this::compare);
+ }
+ else
+ txChanges = null;
+
}
/**
@@ -221,6 +248,32 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
}
}
+ /** {@inheritDoc} */
+ @Override protected GridCursor indexCursor(IndexRow lower, IndexRow upper, boolean lowerInclude, boolean upperInclude) {
+ GridCursor idxCursor = super.indexCursor(lower, upper, lowerInclude, upperInclude);
+
+ if (txChanges == null)
+ return idxCursor;
+
+ // `txChanges` returns single thread data structures e.g. `HashSet`, `ArrayList`.
+ // It safe to use them in multiple `FilteredCursor` instances, because, multi range index scan will be flat to the single cursor.
+ // See AbstractIndexScan#iterator.
+ try {
+ return new SortedSegmentedIndexCursor(
+ new GridCursor[]{
+ // This call will change `txChanges.get1()` content.
+ // Removing found key from set more efficient so we break some rules here.
+ new KeyFilteringCursor<>(idxCursor, txChanges.get1(), r -> r.cacheDataRow().key()),
+ new SortedListRangeCursor<>(this::compare, txChanges.get2(), lower, upper, lowerInclude, upperInclude)
+ },
+ idx.indexDefinition()
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected IndexRow row2indexRow(Row bound) {
if (bound == null)
@@ -362,7 +415,7 @@ private synchronized void release() {
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
- InlineIndexRowFactory rowFactory = isInlineScan() ?
+ InlineIndexRowFactory rowFactory = (isInlineScan() && (txChanges == null || F.isEmpty(txChanges.get1()))) ?
new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null;
BPlusTree.TreeRowClosure rowFilter = isInlineScan() ? null : createNotExpiredRowFilter();
@@ -444,6 +497,8 @@ public static BPlusTree.TreeRowClosure createNotNullRowFilte
InlineIndexKeyType keyType = F.isEmpty(inlineKeyTypes) ? null : inlineKeyTypes.get(0);
return new BPlusTree.TreeRowClosure() {
+ private IndexRow idxRow;
+
/** {@inheritDoc} */
@Override public boolean apply(
BPlusTree tree,
@@ -454,11 +509,14 @@ public static BPlusTree.TreeRowClosure createNotNullRowFilte
if (!checkExpired && keyType != null && io instanceof InlineIO) {
Boolean keyIsNull = keyType.isNull(pageAddr, io.offset(idx), ((InlineIO)io).inlineSize());
- if (keyIsNull == Boolean.TRUE)
+ if (keyIsNull == Boolean.TRUE) {
+ idxRow = null;
+
return false;
+ }
}
- IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+ idxRow = io.getLookupRow(tree, pageAddr, idx);
if (checkExpired &&
idxRow.cacheDataRow().expireTime() > 0 &&
@@ -467,17 +525,35 @@ public static BPlusTree.TreeRowClosure createNotNullRowFilte
return idxRow.key(0).type() != IndexKeyType.NULL;
}
+
+ /** {@inheritDoc} */
+ @Override public IndexRow lastRow() {
+ return idxRow;
+ }
};
}
/** */
public static BPlusTree.TreeRowClosure createNotExpiredRowFilter() {
- return (tree, io, pageAddr, idx) -> {
- IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+ return new BPlusTree.TreeRowClosure() {
+ private IndexRow idxRow;
+
+ @Override public boolean apply(
+ BPlusTree tree,
+ BPlusIO io,
+ long pageAddr,
+ int idx
+ ) throws IgniteCheckedException {
+ idxRow = io.getLookupRow(tree, pageAddr, idx);
- // Skip expired.
- return !(idxRow.cacheDataRow().expireTime() > 0 &&
- idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis());
+ // Skip expired.
+ return !(idxRow.cacheDataRow().expireTime() > 0 &&
+ idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis());
+ }
+
+ @Override public IndexRow lastRow() {
+ return idxRow;
+ }
};
}
@@ -507,4 +583,14 @@ protected TreeIndexWrapper(InlineIndex idx) {
}
}
}
+
+ /** */
+ private int compare(IndexRow o1, IndexRow o2) {
+ try {
+ return InlineIndexTree.compareFullRows(o1, o2, 0, idx.segment(0).rowHandler(), idx.indexDefinition().rowComparator());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/KeyFilteringCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/KeyFilteringCursor.java
new file mode 100644
index 0000000000000..4e0a0f44c50ad
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/KeyFilteringCursor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.lang.GridCursor;
+
+/**
+ * Cursor wrapper that skips all entires that maps to any of {@code skipKeys} key.
+ * Note, for the performance reasons content of {@code skipKeys} will be changed during iteration.
+ */
+class KeyFilteringCursor implements GridCursor {
+ /** Underlying cursor. */
+ private final GridCursor extends R> cursor;
+
+ /** Rows that must be skiped on {@link #cursor} iteration. */
+ private final Set skipKeys;
+
+ /** Mapper from row to {@link KeyCacheObject}. */
+ private final Function toKey;
+
+ /**
+ * @param cursor Sorted cursor.
+ * @param skipKeys Keys to skip. Content will be changed during iteration.
+ * @param toKey Mapper from row to {@link KeyCacheObject}.
+ */
+ KeyFilteringCursor(GridCursor extends R> cursor, Set skipKeys, Function toKey) {
+ this.cursor = cursor;
+ this.skipKeys = skipKeys;
+ this.toKey = toKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() throws IgniteCheckedException {
+ R cur;
+
+ do {
+ if (!cursor.next())
+ return false;
+
+ cur = cursor.get();
+
+ // Intentionally use of `Set#remove` here.
+ // We want perform as few `toKey` as possible.
+ // So we break some rules here to optimize work with the data provided by the underlying cursor.
+ } while (!skipKeys.isEmpty() && skipKeys.remove(toKey.apply(cur)));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R get() throws IgniteCheckedException {
+ return cursor.get();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
index ecb92c451c91f..c53a81e28af3a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
@@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Objects;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
@@ -26,7 +25,6 @@
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
/**
* Runtime sorted index.
@@ -90,7 +88,7 @@ public RuntimeSortedIndex(
Row lowerRow = (lowerBound == null) ? null : lower;
Row upperRow = (upperBound == null) ? null : upper;
- return new Cursor(rows, lowerRow, upperRow, lowerInclude, upperInclude);
+ return new SortedListRangeCursor<>(comp, rows, lowerRow, upperRow, lowerInclude, upperInclude);
}
/**
@@ -104,83 +102,6 @@ public Iterable scan(
return new IndexScan(rowType, this, ranges);
}
- /**
- * Cursor to navigate through a sorted list with duplicates.
- */
- private class Cursor implements GridCursor {
- /** List of rows. */
- private final List rows;
-
- /** Upper bound. */
- private final Row upper;
-
- /** Include upper bound. */
- private final boolean includeUpper;
-
- /** Current row. */
- private Row row;
-
- /** Current index of list element. */
- private int idx;
-
- /**
- * @param rows List of rows.
- * @param lower Lower bound.
- * @param upper Upper bound.
- * @param lowerInclude {@code True} for inclusive lower bound.
- * @param upperInclude {@code True} for inclusive upper bound.
- */
- Cursor(List rows, @Nullable Row lower, @Nullable Row upper, boolean lowerInclude, boolean upperInclude) {
- this.rows = rows;
- this.upper = upper;
- this.includeUpper = upperInclude;
-
- idx = lower == null ? 0 : lowerBound(rows, lower, lowerInclude);
- }
-
- /**
- * Searches the lower bound (skipping duplicates) using a binary search.
- *
- * @param rows List of rows.
- * @param bound Lower bound.
- * @return Lower bound position in the list.
- */
- private int lowerBound(List rows, Row bound, boolean includeBound) {
- int low = 0, high = rows.size() - 1, idx = -1;
-
- while (low <= high) {
- int mid = (high - low) / 2 + low;
- int compRes = comp.compare(rows.get(mid), bound);
-
- if (compRes > 0)
- high = mid - 1;
- else if (compRes == 0 && includeBound) {
- idx = mid;
- high = mid - 1;
- }
- else
- low = mid + 1;
- }
-
- return idx == -1 ? low : idx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- if (idx == rows.size() || (upper != null && comp.compare(upper, rows.get(idx)) < (includeUpper ? 0 : 1)))
- return false;
-
- row = rows.get(idx++);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Row get() {
- return row;
- }
- }
-
/**
*
*/
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SortedListRangeCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SortedListRangeCursor.java
new file mode 100644
index 0000000000000..b56738b8edcb0
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SortedListRangeCursor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor to navigate through a sorted list with duplicates.
+ */
+public class SortedListRangeCursor implements GridCursor {
+ /** */
+ private final Comparator comp;
+
+ /** List of rows. */
+ private final List rows;
+
+ /** Upper bound. */
+ private final Row upper;
+
+ /** Include upper bound. */
+ private final boolean includeUpper;
+
+ /** Current row. */
+ private Row row;
+
+ /** Current index of list element. */
+ private int idx;
+
+ /**
+ * @param comp Rows comparator.
+ * @param rows List of rows.
+ * @param lower Lower bound.
+ * @param upper Upper bound.
+ * @param lowerInclude {@code True} for inclusive lower bound.
+ * @param upperInclude {@code True} for inclusive upper bound.
+ */
+ public SortedListRangeCursor(
+ Comparator comp,
+ List rows,
+ @Nullable Row lower,
+ @Nullable Row upper,
+ boolean lowerInclude,
+ boolean upperInclude
+ ) {
+ this.comp = comp;
+ this.rows = rows;
+ this.upper = upper;
+ this.includeUpper = upperInclude;
+
+ idx = lower == null ? 0 : lowerBound(rows, lower, lowerInclude);
+ }
+
+ /**
+ * Searches the lower bound (skipping duplicates) using a binary search.
+ *
+ * @param rows List of rows.
+ * @param bound Lower bound.
+ * @return Lower bound position in the list.
+ */
+ private int lowerBound(List rows, Row bound, boolean includeBound) {
+ int low = 0, high = rows.size() - 1, idx = -1;
+
+ while (low <= high) {
+ int mid = (high - low) / 2 + low;
+ int compRes = comp.compare(rows.get(mid), bound);
+
+ if (compRes > 0)
+ high = mid - 1;
+ else if (compRes == 0 && includeBound) {
+ idx = mid;
+ high = mid - 1;
+ }
+ else
+ low = mid + 1;
+ }
+
+ return idx == -1 ? low : idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ // Intentionally use `-comp.compare(rows.get(idx), upper)` here.
+ // `InlineIndexTree#compareFullRows` that used here works correctly only for specific parameter order.
+ // First parameter must be tree row and the second search row. See implementation, for details.
+ if (idx == rows.size() || (upper != null && -comp.compare(rows.get(idx), upper) < (includeUpper ? 0 : 1)))
+ return false;
+
+ row = rows.get(idx++);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row get() {
+ return row;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index f70d08231694b..2b618cf04d7e9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -24,23 +24,28 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
+import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/** */
@@ -187,6 +192,15 @@ private class IteratorImpl extends GridIteratorAdapter {
/** */
private GridCursor extends CacheDataRow> cur;
+ /**
+ * First, set of keys changed (inserted, updated or removed) inside transaction: must be skiped during index scan.
+ * Second, list of rows inserted or updated inside transaction: must be mixed with the scan results.
+ */
+ private IgniteBiTuple, List> txChanges;
+
+ /** */
+ private Iterator txIter = Collections.emptyIterator();
+
/** */
private Row next;
@@ -195,6 +209,16 @@ private IteratorImpl() {
assert reserved != null;
parts = new ArrayDeque<>(reserved);
+
+ if (!F.isEmpty(ectx.getQryTxEntries())) {
+ txChanges = ectx.transactionChanges(
+ cctx.cacheId(),
+ // All partitions scaned for replication cache.
+ // See TableScan#reserve.
+ cctx.isReplicated() ? null : TableScan.this.parts,
+ Function.identity()
+ );
+ }
}
/** {@inheritDoc} */
@@ -237,11 +261,25 @@ private void advance() throws IgniteCheckedException {
break;
cur = part.dataStore().cursor(cctx.cacheId());
+
+ if (txChanges != null) {
+ // This call will change `txChanges.get1()` content.
+ // Removing found key from set more efficient so we break some rules here.
+ if (!F.isEmpty(txChanges.get1()))
+ cur = new KeyFilteringCursor<>(cur, txChanges.get1(), CacheSearchRow::key);
+
+ txIter = F.iterator0(txChanges.get2(), true, e -> e.key().partition() == part.id());
+ }
}
- if (cur.next()) {
- CacheDataRow row = cur.get();
+ CacheDataRow row;
+
+ if (cur.next())
+ row = cur.get();
+ else
+ row = txIter.hasNext() ? txIter.next() : null;
+ if (row != null) {
if (row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())
continue;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index 187d2739457d8..d6fb106667b12 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -62,6 +62,7 @@
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
@@ -245,11 +246,26 @@ else if (c2 != HIGHEST_VALUE)
/** */
@SuppressWarnings("rawtypes")
private static int compare(Object o1, Object o2, int nullComparison) {
+ if (o1 instanceof BinaryObjectImpl)
+ return compareBinary(o1, o2, nullComparison);
+
final Comparable c1 = (Comparable)o1;
final Comparable c2 = (Comparable)o2;
return RelFieldCollation.compare(c1, c2, nullComparison);
}
+ /** */
+ private static int compareBinary(Object o1, Object o2, int nullComparison) {
+ if (o1 == o2)
+ return 0;
+ else if (o1 == null)
+ return nullComparison;
+ else if (o2 == null)
+ return -nullComparison;
+
+ return BinaryObjectImpl.compareForDml(o1, o2);
+ }
+
/** {@inheritDoc} */
@Override public Predicate predicate(RexNode filter, RelDataType rowType) {
return new PredicateImpl(scalar(filter, rowType));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 4491028d0beb8..5a9f3dac4ef54 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -30,10 +30,13 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProxyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -195,8 +198,27 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
this.tuples = new ArrayList<>(MODIFY_BATCH_SIZE);
GridCacheContext