Skip to content

Commit

Permalink
refactor: optimization in query execution streams to reduce multiple …
Browse files Browse the repository at this point in the history
…data copy
  • Loading branch information
tglman committed Feb 14, 2025
1 parent 07a0805 commit 94c8409
Show file tree
Hide file tree
Showing 52 changed files with 190 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public OResult next(OCommandContext ctx) {
public void close(OCommandContext ctx) {}

@Override
public boolean isTermination() {
public boolean isTermination(OCommandContext ctx) {
return false;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
}

private OExecutionStream fetchNextResults(OResult res, OCommandContext ctx) {
return OExecutionStream.resultIterator(unroll(res, ctx).iterator());
return OExecutionStream.resultCollection(unroll(res, ctx));
}

protected abstract Collection<OResult> unroll(final OResult doc, final OCommandContext iContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public AggregateProjectionCalculationStep(
@Override
public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutException {
List<OResult> finalResults = executeAggregation(ctx);
return OExecutionStream.resultIterator(finalResults.iterator());
return OExecutionStream.resultCollection(finalResults);
}

private List<OResult> executeAggregation(OCommandContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
ctx.setVariable(loopVariable.getStringValue(), iterator.next());
OScriptExecutionPlan plan = initPlan(ctx);
OExecutionStream result = plan.start(ctx);
if (result.isTermination()) {
if (result.isTermination(ctx)) {
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.sql.executor.resultset.OExecutionStream;
import com.orientechnologies.orient.core.sql.parser.OIdentifier;
import com.orientechnologies.orient.core.sql.parser.OLocalResultSet;
import com.orientechnologies.orient.core.sql.parser.OStatement;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -44,15 +43,16 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
}

private void calculate(OCommandContext ctx) {
ctx.setVariable(varName.getStringValue(), toList(new OLocalResultSet(subExecutionPlan, ctx)));
ctx.setVariable(varName.getStringValue(), toList(subExecutionPlan, ctx));
}

private List<OResult> toList(OLocalResultSet oLocalResultSet) {
private List<OResult> toList(OInternalExecutionPlan plan, OCommandContext ctx) {
OExecutionStream stream = plan.start(ctx);
List<OResult> result = new ArrayList<>();
while (oLocalResultSet.hasNext()) {
result.add(oLocalResultSet.next());
while (stream.hasNext(ctx)) {
result.add(stream.next(ctx));
}
oLocalResultSet.close();
stream.close(ctx);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.sql.executor.resultset.OExecutionStream;
import com.orientechnologies.orient.core.sql.parser.OProjectionItem;
import java.util.Collections;

public class GuaranteeEmptyCountStep extends AbstractExecutionStep {

Expand All @@ -26,7 +25,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
} else {
OResultInternal result = new OResultInternal();
result.setProperty(item.getProjectionAliasAsString(), 0L);
return OExecutionStream.resultIterator(Collections.singleton((OResult) result).iterator());
return OExecutionStream.singleton(result);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.sql.executor.resultset.OExecutionStream;
import com.orientechnologies.orient.core.sql.parser.OIdentifier;
import com.orientechnologies.orient.core.sql.parser.OLocalResultSet;
import com.orientechnologies.orient.core.sql.parser.OStatement;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -34,17 +33,17 @@ private OResultInternal calculate(OResultInternal result, OCommandContext ctx) {
} else {
subExecutionPlan = query.resolvePlan(true, subCtx);
}
result.setMetadata(
varName.getStringValue(), toList(new OLocalResultSet(subExecutionPlan, ctx)));
result.setMetadata(varName.getStringValue(), toList(subExecutionPlan, ctx));
return result;
}

private List<OResult> toList(OLocalResultSet oLocalResultSet) {
private List<OResult> toList(OInternalExecutionPlan plan, OCommandContext ctx) {
OExecutionStream stream = plan.start(ctx);
List<OResult> result = new ArrayList<>();
while (oLocalResultSet.hasNext()) {
result.add(oLocalResultSet.next());
while (stream.hasNext(ctx)) {
result.add(stream.next(ctx));
}
oLocalResultSet.close();
stream.close(ctx);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ && matchesRid(iCommandContext, targetRid, startResult)) {
}
}
iCommandContext.setVariable("$currentMatch", previousMatch);
return OExecutionStream.resultIterator(result.iterator());
return OExecutionStream.resultCollection(result);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
List<OResult> matchedNodes =
(List<OResult>) ctx.getVariable(MatchPrefetchStep.PREFETCHED_MATCH_ALIAS_PREFIX + alias);
if (matchedNodes != null) {
data = OExecutionStream.resultIterator(matchedNodes.iterator());
data = OExecutionStream.resultCollection(matchedNodes);
} else {
data = executionPlan.start(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ && matchesCondition(converted, sub.getFilter(), iCommandContext)) {
}

iCommandContext.setCurrent(oldCurrent);
return OExecutionStream.resultIterator(result.iterator());
return OExecutionStream.resultCollection(result);
}

private boolean matchesCondition(OResultInternal x, OMatchFilter filter, OCommandContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.orientechnologies.common.concur.OTimeoutException;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.sql.executor.resultset.OEdgeTraverserExcutionStream;
import com.orientechnologies.orient.core.sql.executor.resultset.OExecutionStream;
import com.orientechnologies.orient.core.sql.executor.resultset.OResultSetEdgeTraverser;
import com.orientechnologies.orient.core.sql.parser.OFieldMatchPathItem;
import com.orientechnologies.orient.core.sql.parser.OMultiMatchPathItem;

Expand All @@ -24,7 +24,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept

public OExecutionStream createNextResultSet(OResult lastUpstreamRecord, OCommandContext ctx) {
MatchEdgeTraverser trav = createTraverser(lastUpstreamRecord);
return new OResultSetEdgeTraverser(trav);
return new OEdgeTraverserExcutionStream(trav);
}

protected MatchEdgeTraverser createTraverser(OResult lastUpstreamRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public OExecutionStream start(OCommandContext ctx) {
for (int i = 0; i < steps.size(); i++) {
OExecutionStepInternal step = steps.get(i);
OExecutionStream lastResult = step.start(ctx);
if (lastResult.isTermination()) {
if (lastResult.isTermination(ctx)) {
if (idempotent) {
return lastResult;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
} else {
results = Collections.emptyList();
}
return OExecutionStream.resultIterator(results.iterator());
return OExecutionStream.resultCollection(results);
}

private List<OResult> init(OExecutionStepInternal p, OCommandContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
}
OScriptExecutionPlan plan = initPlan(body, ctx);
OExecutionStream result = plan.start(ctx);
if (result.isTermination()) {
if (result.isTermination(ctx)) {
return result;
}
break;
Expand All @@ -55,7 +55,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
if (elseBody != null && elseBody.size() > 0) {
OScriptExecutionPlan plan = initPlan(elseBody, ctx);
OExecutionStream result = plan.start(ctx);
if (result.isTermination()) {
if (result.isTermination(ctx)) {
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public OExecutionStream internalStart(OCommandContext ctx) throws OTimeoutExcept
}

private OExecutionStream fetchNextResults(OResult res, OCommandContext ctx) {
return OExecutionStream.resultIterator(unwind(res, unwindFields, ctx).iterator());
return OExecutionStream.resultCollection(unwind(res, unwindFields, ctx));
}

private Collection<OResult> unwind(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public OResult next(OCommandContext ctx) {
}

@Override
public boolean isTermination() {
return set.isTermination();
public boolean isTermination(OCommandContext ctx) {
return set.isTermination(ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import com.orientechnologies.orient.core.sql.executor.MatchEdgeTraverser;
import com.orientechnologies.orient.core.sql.executor.OResult;

public final class OResultSetEdgeTraverser implements OExecutionStream {
public final class OEdgeTraverserExcutionStream implements OExecutionStream {
private final MatchEdgeTraverser trav;
private OResult nextResult;

public OResultSetEdgeTraverser(MatchEdgeTraverser trav) {
public OEdgeTraverserExcutionStream(MatchEdgeTraverser trav) {
this.trav = trav;
}

Expand Down Expand Up @@ -44,7 +44,7 @@ private void fetchNext(OCommandContext ctx) {
}

@Override
public boolean isTermination() {
public boolean isTermination(OCommandContext ctx) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ public OResult next(OCommandContext ctx) {
public void close(OCommandContext ctx) {}

@Override
public boolean isTermination() {
public boolean isTermination(OCommandContext ctx) {
return false;
}

@Override
public boolean isFullInMemory(OCommandContext ctx) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.orientechnologies.orient.core.sql.executor.OExecutionStep;
import com.orientechnologies.orient.core.sql.executor.OResult;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
Expand All @@ -21,10 +22,15 @@ public interface OExecutionStream {

/**
* Flag used to terminate scripts early in the execution, used by the return statement via
* the terminate execution stream;
* @return
* the terminate execution stream
*
* The only implementation that tour true is {@link OTerminationExecutionStream} created with {@link OExecutionStream#terminate()} the
* other implementation return false if they provide content themselves, or delegate the method if are wrapper (independently of the modifications) of another stream.
*
* @param ctx context of the query
* @return true if this is last execution stream, false if there are more
*/
boolean isTermination();
boolean isTermination(OCommandContext ctx);

public static OExecutionStream produce(OProduceResult producer) {
return new OProduceExecutionStream(producer);
Expand Down Expand Up @@ -52,7 +58,7 @@ public default OExecutionStream flatMap(OFlatMapResult map) {
}

public default OExecutionStream interruptable() {
return new OInterruptResultSet(this);
return new OInterruptExecutionStream(this);
}

public default OExecutionStream limit(long limit) {
Expand All @@ -71,6 +77,10 @@ public static OExecutionStream resultIterator(Iterator<OResult> iterator) {
return new OResultIteratorExecutionStream(iterator);
}

public static OExecutionStream resultCollection(Collection<OResult> iterator) {
return new OResultCollectionExecutionStream(iterator);
}

public default OCostMeasureExecutionStream profile(OExecutionStep step) {
return new OCostMeasureExecutionStream(this, step);
}
Expand All @@ -87,6 +97,20 @@ public static OExecutionStream singleton(OResult result) {
return new OSingletonExecutionStream(result);
}

/**
* Check if the current stream has all the data in memory, without much computation
* need to get the final result.
*
* Only implementation with all the content inside can return true, wrappers with no computation can just delegate, if
* the wrapper do a logic computation should return false.
*
* @param ctx the current query context
* @return true if the data is all in memory
*/
public default boolean isFullInMemory(OCommandContext ctx) {
return false;
}

public interface OnClose {
void close(OCommandContext ctx);
}
Expand All @@ -96,16 +120,16 @@ public default OExecutionStream onClose(OnClose onClose) {
}

public static OExecutionStream collectAll(OExecutionStream from, OCommandContext ctx) {
if (!from.hasNext(ctx)) {
if (!from.hasNext(ctx) || from.isFullInMemory(ctx)) {
return from;
}
List<OResult> result = new ArrayList<>();
while (from.hasNext(ctx)) {
result.add(from.next(ctx));
}
from.close(ctx);
OExecutionStream fullStream = OExecutionStream.resultIterator(result.iterator());
if (from.isTermination()) {
OExecutionStream fullStream = OExecutionStream.resultCollection(result);
if (from.isTermination(ctx)) {
fullStream = fullStream.terminate();
}
return fullStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ private void fail() {
}

@Override
public boolean isTermination() {
return internal.isTermination();
public boolean isFullInMemory(OCommandContext ctx) {
return internal.isFullInMemory(ctx);
}

@Override
public boolean isTermination(OCommandContext ctx) {
return internal.isTermination(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void fetchNextItem(OCommandContext ctx) {
}

@Override
public boolean isTermination() {
return prevResult.isTermination();
public boolean isTermination(OCommandContext ctx) {
return prevResult.isTermination(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void close(OCommandContext ctx) {
}

@Override
public boolean isTermination() {
return currentResultSet.isTermination();
public boolean isTermination(OCommandContext ctx) {
return currentResultSet.isTermination(ctx);
}
}
Loading

0 comments on commit 94c8409

Please sign in to comment.