Skip to content

Commit

Permalink
[Enhancement] adjust MV's warehouse logic (StarRocks#45941)
Browse files Browse the repository at this point in the history
Signed-off-by: starrocks-xupeng <[email protected]>
  • Loading branch information
starrocks-xupeng authored May 21, 2024
1 parent bf99dde commit 91cc9f1
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 47 deletions.
21 changes: 0 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.starrocks.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.alter.OptimizeTask;
import com.starrocks.analysis.IntLiteral;
Expand All @@ -24,7 +23,6 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.pipe.PipeTaskDesc;
import com.starrocks.qe.ConnectContext;
Expand All @@ -38,9 +36,7 @@
import com.starrocks.sql.ast.SubmitTaskStmt;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.collections.MapUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -102,25 +98,12 @@ public static Task buildTask(SubmitTaskStmt submitTaskStmt, ConnectContext conte
*/
private static void handleSpecialTaskProperties(Task task) {
Map<String, String> properties = task.getProperties();
if (MapUtils.isEmpty(properties)) {
return;
}

List<String> toRemove = Lists.newArrayList();
Map<String, String> toAdd = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
// warehouse: translate the warehouse into warehouse_id, in case it changed after renaming
if (entry.getKey().equalsIgnoreCase(SessionVariable.WAREHOUSE)) {
Warehouse wa = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(entry.getValue());
Preconditions.checkArgument(wa != null, "warehouse not exists: " + entry.getValue());

toRemove.add(entry.getKey());
toAdd.put(PropertyAnalyzer.PROPERTIES_WAREHOUSE_ID, String.valueOf(wa.getId()));
}
}

toRemove.forEach(properties::remove);
properties.putAll(toAdd);
}

public static String getAnalyzeMVStmt(String tableName) {
Expand Down Expand Up @@ -163,10 +146,6 @@ public static Task buildMvTask(MaterializedView materializedView, String dbName)
taskProperties.put(PartitionBasedMvRefreshProcessor.MV_ID,
String.valueOf(materializedView.getId()));
taskProperties.putAll(materializedView.getProperties());
// alter mv set warehouse
taskProperties.put(PropertyAnalyzer.PROPERTIES_WAREHOUSE_ID,
String.valueOf(materializedView.getWarehouseId()));

task.setProperties(taskProperties);
task.setDefinition(materializedView.getTaskDefinition());
task.setPostRun(getAnalyzeMVStmt(materializedView.getName()));
Expand Down
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.SystemVariable;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -174,12 +175,11 @@ public Map<String, String> refreshTaskProperties(ConnectContext ctx) {
}
MaterializedView materializedView = (MaterializedView) table;
Preconditions.checkState(materializedView != null);
newProperties = materializedView.getProperties();

// handle warehouse change
newProperties.put(PropertyAnalyzer.PROPERTIES_WAREHOUSE_ID,
String.valueOf(materializedView.getWarehouseId()));
newProperties.putAll(materializedView.getProperties());

Warehouse w = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(
materializedView.getWarehouseId());
newProperties.put(PropertyAnalyzer.PROPERTIES_WAREHOUSE, w.getName());
} catch (Exception e) {
LOG.warn("refresh task properties failed:", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class TaskRunBuilder {
private final Task task;
private Map<String, String> properties;
private Map<String, String> properties = new HashMap<>();
private ConnectContext connectContext;
private ExecuteOption executeOption = new ExecuteOption(false);

Expand Down Expand Up @@ -67,13 +67,18 @@ private Constants.TaskType getTaskType() {
}

private Map<String, String> mergeProperties() {
Map<String, String> result = new HashMap<>();
if (task.getProperties() == null && properties == null) {
return result;
}
if (task.getProperties() == null) {
return properties;
result.putAll(properties);
return result;
}
if (properties == null) {
return task.getProperties();
result.putAll(task.getProperties());
return result;
}
Map<String, String> result = new HashMap<>();
result.putAll(task.getProperties());
result.putAll(properties);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3536,13 +3536,6 @@ private void createTaskForMaterializedView(String dbName, MaterializedView mater
if (optHints != null) {
Map<String, String> taskProperties = task.getProperties();
taskProperties.putAll(optHints);
if (materializedView.getWarehouseId() != WarehouseManager.DEFAULT_WAREHOUSE_ID) {
taskProperties.put(PropertyAnalyzer.PROPERTIES_WAREHOUSE_ID,
String.valueOf(materializedView.getWarehouseId()));

LOG.debug("set warehouse {} in createTaskForMaterializedView",
materializedView.getWarehouseId());
}
}

TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ public void analyzeTaskProperties(Map<String, String> properties) {
};
new MockUp<WarehouseManager>() {
@Mock
Warehouse getWarehouse(String name) {
public Warehouse getWarehouse(String name) {
return new DefaultWarehouse(123, name);
}
@Mock
public Warehouse getWarehouse(long id) {
return new DefaultWarehouse(123, "w1");
}
};

starRocksAssert.ddl("submit task t_warehouse properties('warehouse'='w1') as " +
"insert into tbl1 select * from tbl1");
Task task = tm.getTask("t_warehouse");
Assert.assertFalse(task.getProperties().toString(),
task.getProperties().containsKey(SessionVariable.WAREHOUSE));
Assert.assertTrue(task.getProperties().toString(),
task.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE_ID));
Assert.assertEquals("('warehouse_id'='123')", task.getPropertiesString());
task.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE));
Assert.assertEquals("('warehouse'='w1')", task.getPropertiesString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ protected Table getTable(String dbName, String tableName) {

protected TaskRun buildMVTaskRun(MaterializedView mv, String dbName) {
Task task = TaskBuilder.buildMvTask(mv, dbName);
TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
Map<String, String> testProperties = task.getProperties();
testProperties.put(TaskRun.IS_TEST, "true");
TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
return taskRun;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,13 @@ protected void assertPlanContains(ExecPlan execPlan, String... explain) throws E
private static void initAndExecuteTaskRun(TaskRun taskRun,
String startPartition,
String endPartition) throws Exception {
Task task = taskRun.getTask();
Map<String, String> testProperties = task.getProperties();
Map<String, String> testProperties = taskRun.getProperties();
testProperties.put(TaskRun.IS_TEST, "true");
if (startPartition != null) {
task.getProperties().put(TaskRun.PARTITION_START, startPartition);
testProperties.put(TaskRun.PARTITION_START, startPartition);
}
if (endPartition != null) {
task.getProperties().put(TaskRun.PARTITION_END, endPartition);
testProperties.put(TaskRun.PARTITION_END, endPartition);
}
taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis());
taskRun.executeTaskRun();
Expand Down

0 comments on commit 91cc9f1

Please sign in to comment.