From 0c9de6574fb5ffd07edc3040c7a59796c836351e Mon Sep 17 00:00:00 2001 From: morrySnow Date: Fri, 24 Jan 2025 14:56:28 +0800 Subject: [PATCH] [fix](Nereids) Use the schema saved during planning as the schema of the original target table (#47337) Related PR: #47033 #45045 Problem Summary: because schema change does not involve recreating the table object, but rather rebuilding the full schema. So, we should use the schema saved during planning as the schema of the original target table. --- fe/.idea/vcs.xml | 20 +++---------------- .../doris/nereids/StatementContext.java | 7 +++++++ .../PlanPatternGeneratorAnalyzer.java | 4 +--- .../rules/analysis/CollectRelation.java | 14 ++++++++++--- .../insert/InsertIntoTableCommand.java | 16 ++++++--------- 5 files changed, 28 insertions(+), 33 deletions(-) diff --git a/fe/.idea/vcs.xml b/fe/.idea/vcs.xml index dc3fa65f524b7d..8c0f59e92e6c5b 100644 --- a/fe/.idea/vcs.xml +++ b/fe/.idea/vcs.xml @@ -1,20 +1,4 @@ - - + + + \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index e33ce2bbfe98bb..b82c48a7fda3fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.catalog.constraint.TableIdentifier; @@ -176,6 +177,8 @@ public enum TableFrom { private final Map, TableIf> insertTargetTables = Maps.newHashMap(); // save view's def and sql mode to avoid them change before lock private final Map, Pair> viewInfos = Maps.newHashMap(); + // save insert into schema to avoid schema changed between two read locks + private final List insertTargetSchema = new ArrayList<>(); // for create view support in nereids // key is the start and end position of the sql substring that needs to be replaced, @@ -277,6 +280,10 @@ public Map, TableIf> getTables() { return tables; } + public List getInsertTargetSchema() { + return insertTargetSchema; + } + public void setTables(Map, TableIf> tables) { this.tables.clear(); this.tables.putAll(tables); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java index 99d7c308dacf0d..23e7b5eca762ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java @@ -19,7 +19,6 @@ import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration; -import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean Map> planClassMap = analyzer.getParentClassMap().entrySet().stream() .filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan")) .filter(kv -> !kv.getKey().name.equals("GroupPlan")) - .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod) - && kv.getKey() instanceof ClassDeclaration) + .filter(kv -> kv.getKey() instanceof ClassDeclaration) .collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue())); List generators = planClassMap.entrySet() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 01adc549e3686d..b955283db364b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.util.RelationUtil; import com.google.common.collect.ImmutableList; @@ -75,8 +76,8 @@ public List buildRules() { unboundRelation() .thenApply(this::collectFromUnboundRelation) .toRule(RuleType.COLLECT_TABLE_FROM_RELATION), - unboundTableSink() - .thenApply(this::collectFromUnboundTableSink) + unboundLogicalSink() + .thenApply(this::collectFromUnboundSink) .toRule(RuleType.COLLECT_TABLE_FROM_SINK), any().whenNot(UnboundRelation.class::isInstance) .whenNot(UnboundTableSink.class::isInstance) @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext ctx) { return null; } - private Plan collectFromUnboundTableSink(MatchingContext> ctx) { + private Plan collectFromUnboundSink(MatchingContext> ctx) { List nameParts = ctx.root.getNameParts(); switch (nameParts.size()) { case 1: @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext, if (tableFrom == TableFrom.QUERY) { collectMTMVCandidates(table, cascadesContext); } + if (tableFrom == TableFrom.INSERT_TARGET) { + if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) { + LOG.warn("collect insert target table '{}' more than once.", tableQualifier); + } + cascadesContext.getStatementContext().getInsertTargetSchema().clear(); + cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema()); + } if (table instanceof View) { parseAndCollectFromView(tableQualifier, (View) table, cascadesContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index a7c8c3550ec88b..3e3240c700d7a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -27,7 +27,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; @@ -62,12 +61,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -175,9 +172,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor } // lock after plan and check does table's schema changed to ensure we lock table order by id. TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv()); - List targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf); - targetTables.sort(Comparator.comparing(TableIf::getId)); - MetaLockUtils.readLockTables(targetTables); + newestTargetTableIf.readLock(); try { if (targetTableIf.getId() != newestTargetTableIf.getId()) { LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}", @@ -185,10 +180,11 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor targetTableIf.getId(), newestTargetTableIf.getId()); continue; } - if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) { + // Use the schema saved during planning as the schema of the original target table. + if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) { LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}", retryTimes, DebugUtil.printId(ctx.queryId()), - targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema()); + ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema()); continue; } if (!insertExecutor.isEmptyInsert()) { @@ -198,9 +194,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor buildResult.physicalSink ); } - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); } catch (Throwable e) { - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); // the abortTxn in onFail need to acquire table write lock if (insertExecutor != null) { insertExecutor.onFail(e);