Skip to content

Commit

Permalink
branch-3.0: [fix](Nereids) Use the schema saved during planning as th…
Browse files Browse the repository at this point in the history
…e schema of the original target table #47337 (#47402)

Cherry-picked from #47337

Co-authored-by: morrySnow <[email protected]>
  • Loading branch information
github-actions[bot] and morrySnow authored Jan 26, 2025
1 parent d23584f commit 0f897ed
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand Down Expand Up @@ -176,6 +177,8 @@ public enum TableFrom {
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
// save view's def and sql mode to avoid them change before lock
private final Map<List<String>, Pair<String, Long>> viewInfos = Maps.newHashMap();
// save insert into schema to avoid schema changed between two read locks
private final List<Column> insertTargetSchema = new ArrayList<>();

// for create view support in nereids
// key is the start and end position of the sql substring that needs to be replaced,
Expand Down Expand Up @@ -277,6 +280,10 @@ public Map<List<String>, TableIf> getTables() {
return tables;
}

public List<Column> getInsertTargetSchema() {
return insertTargetSchema;
}

public void setTables(Map<List<String>, TableIf> tables) {
this.tables.clear();
this.tables.putAll(tables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration;

import java.lang.reflect.Modifier;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean
Map<ClassDeclaration, Set<String>> planClassMap = analyzer.getParentClassMap().entrySet().stream()
.filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan"))
.filter(kv -> !kv.getKey().name.equals("GroupPlan"))
.filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod)
&& kv.getKey() instanceof ClassDeclaration)
.filter(kv -> kv.getKey() instanceof ClassDeclaration)
.collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue()));

List<PlanPatternGenerator> generators = planClassMap.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.util.RelationUtil;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -75,8 +76,8 @@ public List<Rule> buildRules() {
unboundRelation()
.thenApply(this::collectFromUnboundRelation)
.toRule(RuleType.COLLECT_TABLE_FROM_RELATION),
unboundTableSink()
.thenApply(this::collectFromUnboundTableSink)
unboundLogicalSink()
.thenApply(this::collectFromUnboundSink)
.toRule(RuleType.COLLECT_TABLE_FROM_SINK),
any().whenNot(UnboundRelation.class::isInstance)
.whenNot(UnboundTableSink.class::isInstance)
Expand Down Expand Up @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext<Plan> ctx) {
return null;
}

private Plan collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
List<String> nameParts = ctx.root.getNameParts();
switch (nameParts.size()) {
case 1:
Expand Down Expand Up @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext,
if (tableFrom == TableFrom.QUERY) {
collectMTMVCandidates(table, cascadesContext);
}
if (tableFrom == TableFrom.INSERT_TARGET) {
if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) {
LOG.warn("collect insert target table '{}' more than once.", tableQualifier);
}
cascadesContext.getStatementContext().getInsertTargetSchema().clear();
cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema());
}
if (table instanceof View) {
parseAndCollectFromView(tableQualifier, (View) table, cascadesContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
Expand Down Expand Up @@ -62,12 +61,10 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -175,20 +172,19 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
}
// lock after plan and check does table's schema changed to ensure we lock table order by id.
TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
List<TableIf> targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf);
targetTables.sort(Comparator.comparing(TableIf::getId));
MetaLockUtils.readLockTables(targetTables);
newestTargetTableIf.readLock();
try {
if (targetTableIf.getId() != newestTargetTableIf.getId()) {
LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getId(), newestTargetTableIf.getId());
continue;
}
if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) {
// Use the schema saved during planning as the schema of the original target table.
if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema());
ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
continue;
}
if (!insertExecutor.isEmptyInsert()) {
Expand All @@ -198,9 +194,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
buildResult.physicalSink
);
}
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
} catch (Throwable e) {
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
// the abortTxn in onFail need to acquire table write lock
if (insertExecutor != null) {
insertExecutor.onFail(e);
Expand Down

0 comments on commit 0f897ed

Please sign in to comment.