Skip to content

Commit

Permalink
[fix](Nereids) Use the schema saved during planning as the schema of …
Browse files Browse the repository at this point in the history
…the original target table (#47337)

Related PR: #47033 #45045

Problem Summary:

because schema change does not involve recreating the table object, but
rather rebuilding the full schema. So, we should use the schema saved
during planning as the schema of the original target table.
  • Loading branch information
morrySnow committed Jan 24, 2025
1 parent 110e0fc commit 0c9de65
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 33 deletions.
20 changes: 3 additions & 17 deletions fe/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand Down Expand Up @@ -176,6 +177,8 @@ public enum TableFrom {
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
// save view's def and sql mode to avoid them change before lock
private final Map<List<String>, Pair<String, Long>> viewInfos = Maps.newHashMap();
// save insert into schema to avoid schema changed between two read locks
private final List<Column> insertTargetSchema = new ArrayList<>();

// for create view support in nereids
// key is the start and end position of the sql substring that needs to be replaced,
Expand Down Expand Up @@ -277,6 +280,10 @@ public Map<List<String>, TableIf> getTables() {
return tables;
}

public List<Column> getInsertTargetSchema() {
return insertTargetSchema;
}

public void setTables(Map<List<String>, TableIf> tables) {
this.tables.clear();
this.tables.putAll(tables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration;

import java.lang.reflect.Modifier;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean
Map<ClassDeclaration, Set<String>> planClassMap = analyzer.getParentClassMap().entrySet().stream()
.filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan"))
.filter(kv -> !kv.getKey().name.equals("GroupPlan"))
.filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod)
&& kv.getKey() instanceof ClassDeclaration)
.filter(kv -> kv.getKey() instanceof ClassDeclaration)
.collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue()));

List<PlanPatternGenerator> generators = planClassMap.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.util.RelationUtil;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -75,8 +76,8 @@ public List<Rule> buildRules() {
unboundRelation()
.thenApply(this::collectFromUnboundRelation)
.toRule(RuleType.COLLECT_TABLE_FROM_RELATION),
unboundTableSink()
.thenApply(this::collectFromUnboundTableSink)
unboundLogicalSink()
.thenApply(this::collectFromUnboundSink)
.toRule(RuleType.COLLECT_TABLE_FROM_SINK),
any().whenNot(UnboundRelation.class::isInstance)
.whenNot(UnboundTableSink.class::isInstance)
Expand Down Expand Up @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext<Plan> ctx) {
return null;
}

private Plan collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
List<String> nameParts = ctx.root.getNameParts();
switch (nameParts.size()) {
case 1:
Expand Down Expand Up @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext,
if (tableFrom == TableFrom.QUERY) {
collectMTMVCandidates(table, cascadesContext);
}
if (tableFrom == TableFrom.INSERT_TARGET) {
if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) {
LOG.warn("collect insert target table '{}' more than once.", tableQualifier);
}
cascadesContext.getStatementContext().getInsertTargetSchema().clear();
cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema());
}
if (table instanceof View) {
parseAndCollectFromView(tableQualifier, (View) table, cascadesContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
Expand Down Expand Up @@ -62,12 +61,10 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -175,20 +172,19 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
}
// lock after plan and check does table's schema changed to ensure we lock table order by id.
TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
List<TableIf> targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf);
targetTables.sort(Comparator.comparing(TableIf::getId));
MetaLockUtils.readLockTables(targetTables);
newestTargetTableIf.readLock();
try {
if (targetTableIf.getId() != newestTargetTableIf.getId()) {
LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getId(), newestTargetTableIf.getId());
continue;
}
if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) {
// Use the schema saved during planning as the schema of the original target table.
if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema());
ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
continue;
}
if (!insertExecutor.isEmptyInsert()) {
Expand All @@ -198,9 +194,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
buildResult.physicalSink
);
}
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
} catch (Throwable e) {
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
// the abortTxn in onFail need to acquire table write lock
if (insertExecutor != null) {
insertExecutor.onFail(e);
Expand Down

0 comments on commit 0c9de65

Please sign in to comment.