Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](schema-change) Forbid modifying mv related columns #47271

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.analysis.ColumnPosition;
import org.apache.doris.analysis.CreateIndexClause;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DropColumnClause;
import org.apache.doris.analysis.DropIndexClause;
import org.apache.doris.analysis.Expr;
Expand Down Expand Up @@ -718,8 +717,8 @@ private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable ol
for (Column column : schema) {
String columnName = column.getName();
if (column.isMaterializedViewColumn()) {
columnName = MaterializedIndexMeta.normalizeName(
CreateMaterializedViewStmt.mvColumnBreaker(columnName));
throw new DdlException("Can not modify column contained by mv, mv="
+ olapTable.getIndexNameById(entry.getKey()));
}
if (columnName.equalsIgnoreCase(modColumn.getName())) {
otherIndexIds.add(entry.getKey());
Expand All @@ -735,8 +734,8 @@ private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable ol
Column col = otherIndexSchema.get(i);
String columnName = col.getName();
if (col.isMaterializedViewColumn()) {
columnName = MaterializedIndexMeta.normalizeName(
CreateMaterializedViewStmt.mvColumnBreaker(columnName));
throw new DdlException("Can not modify column contained by mv, mv="
+ olapTable.getIndexNameById(otherIndexId));
}
if (!columnName.equalsIgnoreCase(modColumn.getName())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -49,6 +51,13 @@
* relation util
*/
public class RelationUtil {
private static final String SYNC_MV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE, PRUNE_EMPTY_PARTITION, "
+ "ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, HAVING_TO_FILTER, ELIMINATE_GROUP_BY, SIMPLIFY_AGG_GROUP_BY, "
+ "MERGE_PERCENTILE_TO_ARRAY, VARIANT_SUB_PATH_PRUNING, INFER_PREDICATES, INFER_AGG_NOT_NULL, "
+ "INFER_SET_OPERATOR_DISTINCT, INFER_FILTER_NOT_NULL, INFER_JOIN_NOT_NULL, MAX_MIN_FILTER_PUSH_DOWN, "
+ "ELIMINATE_SORT, ELIMINATE_AGGREGATE, ELIMINATE_LIMIT, ELIMINATE_SEMI_JOIN, ELIMINATE_NOT_NULL, "
+ "ELIMINATE_JOIN_BY_UK, ELIMINATE_JOIN_BY_FK, ELIMINATE_GROUP_BY_KEY, ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, "
+ "ELIMINATE_FILTER_GROUP_BY_KEY";

/**
* get table qualifier
Expand Down Expand Up @@ -137,23 +146,55 @@ public static Set<String> getMvUsedColumnNames(MaterializedIndexMeta meta) {
Optional<String> querySql = new NereidsParser().parseForSyncMv(createMvSql);
if (querySql.isPresent()) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(querySql.get());
StatementContext statementContext = new StatementContext(ConnectContext.get(),
ConnectContext connectContext = ConnectContext.get();
StatementContext statementContext = new StatementContext(connectContext,
new OriginStatement(querySql.get(), 0));
NereidsPlanner planner = new NereidsPlanner(statementContext);
if (statementContext.getConnectContext().getStatementContext() == null) {
statementContext.getConnectContext().setStatementContext(statementContext);
}
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
LogicalPlan logicalPlan = (LogicalPlan) planner.getCascadesContext().getRewritePlan();

logicalPlan
.collect(plan -> plan instanceof LogicalProject
&& ((LogicalProject<?>) plan).child() instanceof LogicalCatalogRelation)
.stream().forEach(plan -> {
LogicalProject logicalProject = (LogicalProject) plan;
columns.addAll(logicalProject.getInputSlots().stream().map(Slot::getName)
.collect(Collectors.toList()));
});
Set<String> tempDisableRules = connectContext.getSessionVariable().getDisableNereidsRuleNames();
connectContext.getSessionVariable().setDisableNereidsRules(SYNC_MV_PLANER_DISABLE_RULES);
connectContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
LogicalPlan logicalPlan;
try {
// disable rbo sync mv rewrite
connectContext.getSessionVariable()
.setVarOnce(SessionVariable.ENABLE_SYNC_MV_COST_BASED_REWRITE, "true");
// disable constant fold
connectContext.getSessionVariable().setVarOnce(SessionVariable.DEBUG_SKIP_FOLD_CONSTANT, "true");
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY,
ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
logicalPlan = (LogicalPlan) planner.getCascadesContext().getRewritePlan();
} finally {
// after operate, roll back the disable rules
connectContext.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
connectContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Map<Boolean, List<Object>> partitionedPlan = logicalPlan
.collect(plan -> true)
.stream()
.collect(Collectors.partitioningBy(
plan -> plan instanceof LogicalProject
&& ((LogicalProject<?>) plan).child() instanceof LogicalCatalogRelation
));
List<Object> projects = partitionedPlan.get(true);
if (projects.isEmpty()) {
// for scan
partitionedPlan.get(false)
.stream()
.filter(plan -> plan instanceof LogicalCatalogRelation)
.map(plan -> (LogicalCatalogRelation) plan)
.forEach(plan -> columns.addAll(logicalPlan.getOutput().stream().map(Slot::getName).collect(
Collectors.toList())));
} else {
// for projects
projects
.stream()
.map(plan -> (LogicalProject<?>) plan)
.forEach(plan -> columns.addAll(plan.getInputSlots().stream().map(Slot::getName).collect(
Collectors.toList())));
}
} else {
throw new AnalysisException(String.format("can't parse %s ", createMvSql));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,3 @@ mv_tbl_scalar_types_agg_1 AGG_KEYS mv_k2 bigint bigint Yes true \N true `k2`

-- !sql --

-- !master_sql --
tbl_scalar_types_agg AGG_KEYS k1 bigint bigint Yes true \N true
k2 bigint bigint Yes true \N true
c_bool boolean boolean Yes false \N REPLACE true
c_tinyint tinyint tinyint Yes false \N MIN true
c_smallint smallint smallint Yes false \N MAX true
c_int bigint bigint Yes false \N MAX true
c_bigint bigint bigint Yes false \N SUM true
c_largeint largeint largeint Yes false \N MIN true
c_float float float Yes false \N MIN true
c_double double double Yes false \N MAX true
c_decimal DECIMAL(20, 3) decimalv3(20,3) Yes false \N SUM true
c_decimalv3 DECIMAL(20, 3) decimalv3(20,3) Yes false \N SUM true
c_date DATE datev2 Yes false \N REPLACE true
c_datetime DATETIME datetimev2(0) Yes false \N REPLACE true
c_datev2 DATE datev2 Yes false \N REPLACE true
c_datetimev2 DATETIME datetimev2(0) Yes false \N REPLACE true
c_char char(15) char(15) Yes false \N REPLACE true
c_varchar varchar(100) varchar(100) Yes false \N REPLACE true
c_string text text Yes false \N REPLACE true

mv_tbl_scalar_types_agg_1 AGG_KEYS mv_k2 bigint bigint Yes true \N true `k2`
mv_k1 bigint bigint Yes true \N true `k1`
mva_MAX__`c_int` bigint bigint Yes false \N MAX true `c_int`

Original file line number Diff line number Diff line change
Expand Up @@ -62,42 +62,3 @@ mv_tbl_scalar_types_dup_1 DUP_KEYS mv_c_tinyint tinyint tinyint Yes true \N tru
-2106969609 true 10 29572 16738 1736115820 -957295886 -13319.206 -1.333603562816737E9 91224478600376111.942 69457425159617037.453 2022-09-06 2022-05-08T19:52:36 2022-04-05 2022-08-17T19:23:31 222.79.139.99 [email protected] Oxford Alley 77
-2102307005 true 10 -23674 24613 -1810828490 -47095409 -14686.167 2.072108685694799E9 39847820962230526.125 584354832299375.156 2022-03-27 2022-02-11T13:46:06 2022-12-25 2022-11-28T09:37:49 213.146.33.250 [email protected] Eagle Crest Terrace 84

-- !master_sql --
tbl_scalar_types_dup DUP_KEYS k1 bigint bigint Yes true \N true
c_bool boolean boolean Yes false \N NONE true
c_tinyint tinyint tinyint Yes false \N NONE true
c_smallint smallint smallint Yes false \N NONE true
c_int bigint bigint Yes false \N NONE true
c_bigint bigint bigint Yes false \N NONE true
c_largeint largeint largeint Yes false \N NONE true
c_float float float Yes false \N NONE true
c_double double double Yes false \N NONE true
c_decimal DECIMAL(20, 3) decimalv3(20,3) Yes false \N NONE true
c_decimalv3 DECIMAL(20, 3) decimalv3(20,3) Yes false \N NONE true
c_date DATE datev2 Yes false \N NONE true
c_datetime DATETIME datetimev2(0) Yes false \N NONE true
c_datev2 DATE datev2 Yes false \N NONE true
c_datetimev2 DATETIME datetimev2(0) Yes false \N NONE true
c_char char(15) char(15) Yes false \N NONE true
c_varchar varchar(100) varchar(100) Yes false \N NONE true
c_string text text Yes false \N NONE true

mv_tbl_scalar_types_dup_1 DUP_KEYS mv_c_tinyint tinyint tinyint Yes true \N true `c_tinyint`
mv_c_bool boolean boolean Yes true \N true `c_bool`
mv_k1 bigint bigint Yes true \N true `k1`
mv_c_smallint smallint smallint Yes false \N NONE true `c_smallint`
mv_c_int bigint bigint Yes false \N NONE true `c_int`
mv_c_bigint bigint bigint Yes false \N NONE true `c_bigint`
mv_c_largeint largeint largeint Yes false \N NONE true `c_largeint`
mv_c_float float float Yes false \N NONE true `c_float`
mv_c_double double double Yes false \N NONE true `c_double`
mv_c_decimal DECIMAL(20, 3) decimalv3(20,3) Yes false \N NONE true `c_decimal`
mv_c_decimalv3 DECIMAL(20, 3) decimalv3(20,3) Yes false \N NONE true `c_decimalv3`
mv_c_date DATE datev2 Yes false \N NONE true `c_date`
mv_c_datetime DATETIME datetimev2(0) Yes false \N NONE true `c_datetime`
mv_c_datev2 DATE datev2 Yes false \N NONE true `c_datev2`
mv_c_datetimev2 DATETIME datetimev2(0) Yes false \N NONE true `c_datetimev2`
mv_c_char character(255) character(255) Yes false \N NONE true `c_char`
mv_c_varchar varchar(65533) varchar(65533) Yes false \N NONE true `c_varchar`
mv_c_string text text Yes false \N NONE true `c_string`

Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,8 @@ PROPERTIES (
heart_type = 1
;""")

sql """ ALTER TABLE rt_new MODIFY COLUMN event_id VARCHAR(51) NULL;"""
Thread.sleep(1000)

streamLoad {
table "rt_new"
set 'column_separator', ','
set 'columns', '`battery_id`,`create_time`,`imei`,`event_id`,`event_name`,`heart_type`'

file './test2'
time 10000 // limit inflight 10s
test {
sql """ ALTER TABLE rt_new MODIFY COLUMN event_id VARCHAR(51) NULL;"""
exception "Can not modify column contained by mv"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,8 @@ suite("schema_change_modify_mv_column_type_agg") {
qt_sql "SELECT * from ${testTable} order by 1, 2, 3 limit 10"
qt_sql "SELECT * from ${testTable} where c_tinyint = 10 order by 1, 2, 3 limit 10 "

sql """
ALTER table ${testTable} MODIFY COLUMN c_int BIGINT max;
"""
def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
int max_try_time = 100
while (max_try_time--){
String result = getJobState(testTable)
if (result == "FINISHED") {
break
} else {
sleep(2000)
if (max_try_time < 1){
assertEquals(1,2)
}
}
test {
sql """ ALTER table ${testTable} MODIFY COLUMN c_int BIGINT max """
exception "Can not modify column contained by mv"
}
qt_master_sql """ desc ${testTable} all """
sql "INSERT INTO ${testTable} SELECT * from ${testTable}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,8 @@ suite("schema_change_modify_mv_column_type") {
sql "set topn_opt_limit_threshold = 100"
qt_sql "SELECT * from ${testTable} order by 1, 2, 3 limit 10"
qt_sql "SELECT * from ${testTable} where c_tinyint = 10 order by 1, 2, 3 limit 10 "

sql """
ALTER table ${testTable} MODIFY COLUMN c_int BIGINT;
"""
def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
int max_try_time = 100
while (max_try_time--){
String result = getJobState(testTable)
if (result == "FINISHED") {
break
} else {
sleep(2000)
if (max_try_time < 1){
assertEquals(1,2)
}
}
test {
sql "ALTER table ${testTable} MODIFY COLUMN c_int BIGINT;"
exception "Can not modify column contained by mv"
}
// sync materialized view rewrite will fail when schema change, tmp disable, enable when fixed
sql """set enable_dml_materialized_view_rewrite = false;"""
qt_master_sql """ desc ${testTable} all """
sql "INSERT INTO ${testTable} SELECT * from ${testTable}"
}
Loading