Skip to content

Commit

Permalink
Merge branch 'master' into fix-dead-lock-tablet-stat
Browse files Browse the repository at this point in the history
  • Loading branch information
xy720 authored Jan 15, 2025
2 parents 04d1fef + b6a866b commit 6d1a367
Show file tree
Hide file tree
Showing 125 changed files with 550 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3285,6 +3285,14 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {"存算分离模式下commit阶段等锁超时时间,默认5s"})
public static int try_commit_lock_timeout_seconds = 5;

@ConfField(mutable = true, description = {"是否在事务提交时对所有表启用提交锁。设置为 true 时,所有表都会使用提交锁。"
+ "设置为 false 时,仅对 Merge-On-Write 表使用提交锁。默认值为 true。",
"Whether to enable commit lock for all tables during transaction commit."
+ "If true, commit lock will be applied to all tables."
+ "If false, commit lock will only be applied to Merge-On-Write tables."
+ "Default value is true." })
public static boolean enable_commit_lock_for_all_tables = true;

@ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"})
public static boolean enable_cloud_txn_lazy_commit = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,21 @@ private void commitTransactionWithSubTxns(long dbId, List<Table> tableList, long
executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
}

// add some log and get commit lock, mainly used for mow tables
private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
if (Config.enable_commit_lock_for_all_tables) {
// If enabled, lock all tables
return tableList.stream()
.sorted(Comparator.comparingLong(Table::getId))
.collect(Collectors.toList());
} else {
// If disabled, only lock MOW tables
return tableList.stream()
.filter(table -> table instanceof OlapTable && ((OlapTable) table).getEnableUniqueKeyMergeOnWrite())
.sorted(Comparator.comparingLong(Table::getId))
.collect(Collectors.toList());
}
}

private void beforeCommitTransaction(List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
for (int i = 0; i < tableList.size(); i++) {
Expand All @@ -1180,29 +1194,21 @@ private void beforeCommitTransaction(List<Table> tableList, long transactionId,
}
}

// Get tables that require commit lock - only MOW tables need this:
// 1. Filter to keep only OlapTables with MOW enabled
// 2. Sort by table ID to maintain consistent locking order and prevent deadlocks
List<Table> mowTableList = tableList.stream()
.filter(table -> table instanceof OlapTable && ((OlapTable) table).getEnableUniqueKeyMergeOnWrite())
.sorted(Comparator.comparingLong(Table::getId))
.collect(Collectors.toList());
increaseWaitingLockCount(mowTableList);
if (!MetaLockUtils.tryCommitLockTables(mowTableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
decreaseWaitingLockCount(mowTableList);
List<Table> tablesToLock = getTablesNeedCommitLock(tableList);
increaseWaitingLockCount(tablesToLock);
if (!MetaLockUtils.tryCommitLockTables(tablesToLock, timeoutMillis, TimeUnit.MILLISECONDS)) {
decreaseWaitingLockCount(tablesToLock);
// DELETE_BITMAP_LOCK_ERR will be retried on be
throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
"get table cloud commit lock timeout, tableList=("
+ StringUtils.join(mowTableList, ",") + ")");
+ StringUtils.join(tablesToLock, ",") + ")");
}
}

private void afterCommitTransaction(List<Table> tableList) {
List<Table> mowTableList = tableList.stream()
.filter(table -> table instanceof OlapTable && ((OlapTable) table).getEnableUniqueKeyMergeOnWrite())
.collect(Collectors.toList());
decreaseWaitingLockCount(mowTableList);
MetaLockUtils.commitUnlockTables(mowTableList);
List<Table> tablesToUnlock = getTablesNeedCommitLock(tableList);
decreaseWaitingLockCount(tablesToUnlock);
MetaLockUtils.commitUnlockTables(tablesToUnlock);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,13 @@ public PlanFragment visitPhysicalHashAggregate(
// 1. generate slot reference for each group expression
List<SlotReference> groupSlots = collectGroupBySlots(groupByExpressions, outputExpressions);
ArrayList<Expr> execGroupingExpressions = groupByExpressions.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.map(e -> {
Expr result = ExpressionTranslator.translate(e, context);
if (result == null) {
throw new RuntimeException("translate " + e + " failed");
}
return result;
})
.collect(Collectors.toCollection(ArrayList::new));
// 2. collect agg expressions and generate agg function to slot reference map
List<Slot> aggFunctionOutput = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* create project under aggregate to enable CSE
Expand Down Expand Up @@ -102,8 +103,38 @@ public Plan visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg
}

if (aggregate.child() instanceof PhysicalProject) {
List<NamedExpression> newProjections = Lists.newArrayList();
// do column prune
// case 1:
// original plan
// agg(groupKey[C+1, abs(C+1)]
// -->project(A+B as C)
//
// "A+B as C" should be reserved
// new plan
// agg(groupKey=[D, abs(D)])
// -->project(A+B as C, C+1 as D)
// case 2:
// original plan
// agg(groupKey[A+1, abs(A+1)], output[sum(B)])
// --> project(A, B)
// "A+1" is extracted, we have
// plan1:
// agg(groupKey[X, abs(X)], output[sum(B)])
// --> project(A, B, A+1 as X)
// then column prune(A should be pruned, because it is not used directly by AGG)
// we have plan2:
// agg(groupKey[X, abs(X)], output[sum(B)])
// -->project(B, A+1 as X)
PhysicalProject<? extends Plan> project = (PhysicalProject<? extends Plan>) aggregate.child();
List<NamedExpression> newProjections = Lists.newArrayList(project.getProjects());
Set<Slot> newInputSlots = aggOutputReplaced.stream()
.flatMap(expr -> expr.getInputSlots().stream())
.collect(Collectors.toSet());
for (NamedExpression expr : project.getProjects()) {
if (!(expr instanceof SlotReference) || newInputSlots.contains(expr)) {
newProjections.add(expr);
}
}
newProjections.addAll(cseCandidates.values());
project = project.withProjectionsAndChild(newProjections, (Plan) project.child());
aggregate = (PhysicalHashAggregate<? extends Plan>) aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughSort;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughWindow;
import org.apache.doris.nereids.rules.rewrite.PushDownJoinOtherCondition;
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownProjectThroughLimit;
import org.apache.doris.nereids.rules.rewrite.PushDownTopNDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughJoin;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -132,6 +135,9 @@ public class RuleSet {
.add(PushDownProjectThroughInnerOuterJoin.INSTANCE)
.add(PushDownProjectThroughSemiJoin.INSTANCE)
.add(TransposeAggSemiJoinProject.INSTANCE)
.addAll(new PushDownTopNThroughJoin().buildRules())
.addAll(new PushDownLimitDistinctThroughJoin().buildRules())
.addAll(new PushDownTopNDistinctThroughJoin().buildRules())
.build();

public static final List<RuleFactory> PUSH_DOWN_FILTERS = ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate;
import org.apache.doris.nereids.rules.expression.rules.SimplifyInPredicate;
import org.apache.doris.nereids.rules.expression.rules.SimplifyRange;
import org.apache.doris.nereids.rules.expression.rules.SimplifySelfComparison;
import org.apache.doris.nereids.rules.expression.rules.TopnToMax;

import com.google.common.collect.ImmutableList;
Expand All @@ -46,6 +47,7 @@ public class ExpressionOptimization extends ExpressionRewrite {
SimplifyComparisonPredicate.INSTANCE,
SimplifyInPredicate.INSTANCE,
SimplifyRange.INSTANCE,
SimplifySelfComparison.INSTANCE,
DateFunctionRewrite.INSTANCE,
ArrayContainToArrayOverlap.INSTANCE,
CaseWhenToIf.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum ExpressionRuleType {
SIMPLIFY_IN_PREDICATE,
SIMPLIFY_NOT_EXPR,
SIMPLIFY_RANGE,
SIMPLIFY_SELF_COMPARISON,
SUPPORT_JAVA_DATE_FORMATTER,
TOPN_TO_MAX;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.expression.rules;

import org.apache.doris.nereids.rules.expression.ExpressionPatternMatcher;
import org.apache.doris.nereids.rules.expression.ExpressionPatternRuleFactory;
import org.apache.doris.nereids.rules.expression.ExpressionRuleType;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThan;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* simplify with self comparison
* such as: a = a --> TRUE
* a + b > a + b --> FALSE
*/
public class SimplifySelfComparison implements ExpressionPatternRuleFactory {
public static SimplifySelfComparison INSTANCE = new SimplifySelfComparison();

@Override
public List<ExpressionPatternMatcher<? extends Expression>> buildRules() {
return ImmutableList.of(
matchesType(ComparisonPredicate.class)
.then(this::rewrite)
.toRule(ExpressionRuleType.SIMPLIFY_SELF_COMPARISON)
);
}

private Expression rewrite(ComparisonPredicate comparison) {
Expression left = comparison.left();
if (left.equals(comparison.right())) {
if (comparison instanceof EqualTo
|| comparison instanceof GreaterThanEqual
|| comparison instanceof LessThanEqual) {
return ExpressionUtils.trueOrNull(left);
}
if (comparison instanceof NullSafeEqual) {
return BooleanLiteral.TRUE;
}
if (comparison instanceof GreaterThan || comparison instanceof LessThan) {
return ExpressionUtils.falseOrNull(left);
}
}

return comparison;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand All @@ -42,6 +43,10 @@ public List<Rule> buildRules() {
// limit -> distinct -> join
logicalLimit(logicalAggregate(logicalJoin())
.when(LogicalAggregate::isDistinct))
.when(limit ->
ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().topnOptLimitThreshold
>= limit.getLimit() + limit.getOffset())
.then(limit -> {
LogicalAggregate<LogicalJoin<Plan, Plan>> agg = limit.child();
LogicalJoin<Plan, Plan> join = agg.child();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand All @@ -45,6 +46,10 @@ public List<Rule> buildRules() {
// topN -> join
logicalTopN(logicalAggregate(logicalJoin()).when(LogicalAggregate::isDistinct))
// TODO: complex order by
.when(topn ->
ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().topnOptLimitThreshold
>= topn.getLimit() + topn.getOffset())
.when(topN -> topN.getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Slot.class::isInstance))
.then(topN -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand All @@ -43,6 +44,10 @@ public List<Rule> buildRules() {
// topN -> join
logicalTopN(logicalJoin())
// TODO: complex orderby
.when(topn ->
ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().topnOptLimitThreshold
>= topn.getLimit() + topn.getOffset())
.when(topN -> topN.getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Slot.class::isInstance))
.then(topN -> {
Expand Down Expand Up @@ -102,7 +107,6 @@ private Plan pushLimitThroughJoin(LogicalTopN<? extends Plan> topN, LogicalJoin<
}
return null;
case CROSS_JOIN:

if (join.left().getOutputSet().containsAll(orderbySlots)) {
return join.withChildren(
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.left()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ private Statistics computeOlapScan(OlapScan olapScan) {
// mv is selected, return its estimated stats
Optional<Statistics> optStats = cascadesContext.getStatementContext()
.getStatistics(((Relation) olapScan).getRelationId());
LOG.info("computeOlapScan optStats isPresent {}, tableRowCount is {}",
optStats.isPresent(), tableRowCount);
LOG.info("computeOlapScan optStats isPresent {}, tableRowCount is {}, table name is {}",
optStats.isPresent(), tableRowCount, olapTable.getQualifiedName());
if (optStats.isPresent()) {
double selectedPartitionsRowCount = getSelectedPartitionRowCount(olapScan, tableRowCount);
LOG.info("computeOlapScan optStats is {}, selectedPartitionsRowCount is {}", optStats.get(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.expression.rules;

import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteTestHelper;
import org.apache.doris.nereids.rules.expression.ExpressionRuleExecutor;

import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;

class SimplifySelfComparisonTest extends ExpressionRewriteTestHelper {

@Test
void testRewrite() {
executor = new ExpressionRuleExecutor(ImmutableList.of(
ExpressionRewrite.bottomUp(SimplifySelfComparison.INSTANCE)
));

assertRewriteAfterTypeCoercion("TA + TB = TA + TB", "NOT ((TA + TB) IS NULL) OR NULL");
assertRewriteAfterTypeCoercion("TA + TB >= TA + TB", "NOT ((TA + TB) IS NULL) OR NULL");
assertRewriteAfterTypeCoercion("TA + TB <= TA + TB", "NOT ((TA + TB) IS NULL) OR NULL");
assertRewriteAfterTypeCoercion("TA + TB <=> TA + TB", "TRUE");
assertRewriteAfterTypeCoercion("TA + TB > TA + TB", "(TA + TB) IS NULL AND NULL");
assertRewriteAfterTypeCoercion("TA + TB < TA + TB", "(TA + TB) IS NULL AND NULL");
}

}
Loading

0 comments on commit 6d1a367

Please sign in to comment.