Skip to content

Commit

Permalink
IGNITE-22732 Transaction aware SQL (#11438)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Sep 20, 2024
1 parent 2fff615 commit 00a0d40
Show file tree
Hide file tree
Showing 62 changed files with 3,870 additions and 936 deletions.
5 changes: 4 additions & 1 deletion .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;

/** */
Expand Down Expand Up @@ -486,6 +489,8 @@ private <T> List<T> parseAndProcessQuery(
String sql,
Object... params
) throws IgniteSQLException {
ensureTransactionModeSupported(qryCtx);

SchemaPlus schema = schemaHolder.schema(schemaName);

assert schema != null : "Schema not found: " + schemaName;
Expand Down Expand Up @@ -588,7 +593,32 @@ private Object contextKey(QueryContext qryCtx) {

SqlFieldsQuery sqlFieldsQry = qryCtx.unwrap(SqlFieldsQuery.class);

return sqlFieldsQry != null ? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder()) : null;
return sqlFieldsQry != null
? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder(), queryTransactionVersion(qryCtx) == null)
: null;
}

/** */
private static GridCacheVersion queryTransactionVersion(@Nullable QueryContext qryCtx) {
return qryCtx == null ? null : qryCtx.unwrap(GridCacheVersion.class);
}

/** */
private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) {
if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
return;

GridCacheVersion ver = queryTransactionVersion(qryCtx);

if (ver == null)
return;

final GridNearTxLocal userTx = ctx.cache().context().tm().tx(ver);

if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(userTx.isolation()))
return;

throw new IllegalStateException("Transaction isolation mode not supported for SQL queries: " + userTx.isolation());
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ protected AbstractIndexScan(
/** {@inheritDoc} */
@Override public synchronized Iterator<Row> iterator() {
if (ranges == null)
return new IteratorImpl(idx.find(null, null, true, true, indexQueryContext()));
return new IteratorImpl(indexCursor(null, null, true, true));

IgniteClosure<RangeCondition<Row>, IteratorImpl> clo = range -> {
IdxRow lower = range.lower() == null ? null : row2indexRow(range.lower());
IdxRow upper = range.upper() == null ? null : row2indexRow(range.upper());

return new IteratorImpl(
idx.find(lower, upper, range.lowerInclude(), range.upperInclude(), indexQueryContext()));
indexCursor(lower, upper, range.lowerInclude(), range.upperInclude()));
};

if (!ranges.multiBounds()) {
Expand All @@ -88,6 +88,11 @@ protected AbstractIndexScan(
return F.flat(F.iterator(ranges, clo, true));
}

/** */
protected GridCursor<IdxRow> indexCursor(IdxRow lower, IdxRow upper, boolean lowerInclude, boolean upperInclude) {
return idx.find(lower, upper, lowerInclude, upperInclude, indexQueryContext());
}

/** */
protected abstract IdxRow row2indexRow(Row bound);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
NoOpMemoryTracker.INSTANCE,
NoOpIoTracker.INSTANCE,
0,
ImmutableMap.of());
ImmutableMap.of(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,37 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
import org.apache.ignite.internal.processors.query.calcite.message.QueryTxEntry;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractQueryContext;
Expand All @@ -45,8 +58,12 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.query.calcite.util.Commons.checkRange;

Expand Down Expand Up @@ -102,6 +119,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final long timeout;

/** */
private final Collection<QueryTxEntry> qryTxEntries;

/** */
private final long startTs;

Expand All @@ -127,7 +147,8 @@ public ExecutionContext(
MemoryTracker qryMemoryTracker,
IoTracker ioTracker,
long timeout,
Map<String, Object> params
Map<String, Object> params,
@Nullable Collection<QueryTxEntry> qryTxEntries
) {
super(qctx);

Expand All @@ -142,6 +163,7 @@ public ExecutionContext(
this.ioTracker = ioTracker;
this.params = params;
this.timeout = timeout;
this.qryTxEntries = qryTxEntries;

startTs = U.currentTimeMillis();

Expand Down Expand Up @@ -289,6 +311,84 @@ public void setCorrelated(@NotNull Object value, int id) {
correlations[id] = value;
}

/**
* @return Transaction write map.
*/
public Collection<QueryTxEntry> getQryTxEntries() {
return qryTxEntries;
}

/** */
public static Collection<QueryTxEntry> transactionChanges(
Collection<IgniteTxEntry> writeEntries
) {
if (F.isEmpty(writeEntries))
return null;

Collection<QueryTxEntry> res = new ArrayList<>();

for (IgniteTxEntry e : writeEntries) {
CacheObject val = e.value();

if (!F.isEmpty(e.entryProcessors()))
val = e.applyEntryProcessors(val);

res.add(new QueryTxEntry(e.cacheId(), e.key(), val, e.explicitVersion()));
}

return res;
}

/**
* @param cacheId Cache id.
* @param parts Partitions set.
* @param mapper Mapper to specific data type.
* @return First, set of object changed in transaction, second, list of transaction data in required format.
* @param <R> Required type.
*/
public <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionChanges(
int cacheId,
int[] parts,
Function<CacheDataRow, R> mapper
) {
if (F.isEmpty(qryTxEntries))
return F.t(Collections.emptySet(), Collections.emptyList());

// Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small.
if (parts != null && !F.isSorted(parts))
Arrays.sort(parts);

Set<KeyCacheObject> changedKeys = new HashSet<>(qryTxEntries.size());
List<R> newAndUpdatedRows = new ArrayList<>(qryTxEntries.size());

for (QueryTxEntry e : qryTxEntries) {
int part = e.key().partition();

assert part != -1;

if (e.cacheId() != cacheId)
continue;

if (parts != null && Arrays.binarySearch(parts, part) < 0)
continue;

changedKeys.add(e.key());

CacheObject val = e.value();

if (val != null) { // Mix only updated or inserted entries. In case val == null entry removed.
newAndUpdatedRows.add(mapper.apply(new CacheDataRowAdapter(
e.key(),
val,
e.version(),
CU.EXPIRE_TIME_ETERNAL // Expire time calculated on commit, can use eternal here.
)));
}
}

return F.t(changedKeys, newAndUpdatedRows);
}

/**
* Executes a query task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
Expand Down Expand Up @@ -608,6 +609,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(

MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());

final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());

ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
Expand All @@ -620,7 +623,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qryMemoryTracker,
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
qryParams);
qryParams,
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));

Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
Expand Down Expand Up @@ -659,7 +663,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled,
timeout
timeout,
ectx.getQryTxEntries()
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -867,7 +872,8 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
createIoTracker(nodeId, msg.originatingQryId()),
msg.timeout(),
Commons.parametersMap(msg.parameters())
Commons.parametersMap(msg.parameters()),
msg.queryTransactionEntries()
);

executeFragment(qry, (FragmentPlan)qryPlan, ectx);
Expand Down
Loading

0 comments on commit 00a0d40

Please sign in to comment.