From 975ad4ff233bb7228ca433b8e7305409929affe4 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu <65238551+HxpSerein@users.noreply.github.com> Date: Sun, 29 Sep 2024 01:23:48 +0800 Subject: [PATCH] support spark task (#4102) --- .../console/core/bean/SparkTaskItem.java | 32 ++++ .../console/core/entity/FlinkApplication.java | 4 +- .../core/enums/DistributedTaskEnum.java | 12 +- .../core/service/DistributedTaskService.java | 18 +-- .../SparkApplicationActionServiceImpl.java | 32 +++- .../impl/DistributedTaskServiceImpl.java | 137 ++++++++++++------ .../core/watcher/FlinkAppHttpWatcher.java | 18 ++- .../core/watcher/SparkAppHttpWatcher.java | 28 ++-- .../service/DistributedTaskServiceTest.java | 27 +++- 9 files changed, 225 insertions(+), 83 deletions(-) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java new file mode 100644 index 0000000000..c681a7f8c8 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/SparkTaskItem.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SparkTaskItem implements Serializable { + + /** appId */ + private Long appId; + + private Boolean autoStart; + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java index 382f71fef1..3b9c86ffa9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java @@ -26,6 +26,7 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.console.base.mybatis.entity.BaseEntity; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.bean.Dependency; @@ -53,7 +54,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import java.io.Serializable; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -64,7 +64,7 @@ @Data @TableName("t_flink_app") @Slf4j -public class FlinkApplication implements Serializable { +public class FlinkApplication extends BaseEntity { @TableId(type = IdType.INPUT) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java index e27a9ce950..f397a64fba 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/DistributedTaskEnum.java @@ -45,7 +45,17 @@ public enum DistributedTaskEnum { /** * Forces the given application to stop. */ - ABORT(4); + ABORT(4), + + /** + * Stop the given application. + */ + STOP(5), + + /** + * Forces the given application to stop. + */ + FORCED_STOP(6); private final int value; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java index a1730f5bd8..0a60c4c1d2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java @@ -17,13 +17,12 @@ package org.apache.streampark.console.core.service; +import org.apache.streampark.console.base.mybatis.entity.BaseEntity; import org.apache.streampark.console.core.entity.DistributedTask; -import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.enums.DistributedTaskEnum; import com.baomidou.mybatisplus.extension.service.IService; -import java.util.List; import java.util.Set; /** @@ -40,16 +39,9 @@ public interface DistributedTaskService extends IService { /** * This interface is responsible for polling the database to retrieve task records and execute the corresponding operations. - * @param DistributedTask DistributedTask + * @param distributedTask distributedTask */ - void executeDistributedTask(DistributedTask DistributedTask) throws Exception; - - /** - * Through this interface, the watcher obtains the list of tasks that need to be monitored. - * @param applications List - * @return List List of tasks that need to be monitored - */ - List getMonitoredTaskList(List applications); + void executeDistributedTask(DistributedTask distributedTask) throws Exception; /** * This interface handles task redistribution when server nodes are added. @@ -74,9 +66,9 @@ public interface DistributedTaskService extends IService { /** * Save Distributed Task. * - * @param appParam Application + * @param appParam It may be one of the following values: FlinkApplication, SparkApplication * @param autoStart boolean * @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT */ - public void saveDistributedTask(FlinkApplication appParam, boolean autoStart, DistributedTaskEnum action); + public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 4812ebbc1f..40912e3be6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -39,11 +39,13 @@ import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.entity.SparkSql; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; +import org.apache.streampark.console.core.enums.DistributedTaskEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.enums.SparkAppStateEnum; import org.apache.streampark.console.core.enums.SparkOperationEnum; import org.apache.streampark.console.core.enums.SparkOptionStateEnum; import org.apache.streampark.console.core.mapper.SparkApplicationMapper; +import org.apache.streampark.console.core.service.DistributedTaskService; import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.SparkSqlService; @@ -129,6 +131,9 @@ public class SparkApplicationActionServiceImpl @Autowired private ResourceService resourceService; + @Autowired + private DistributedTaskService distributedTaskService; + private final Map> startJobFutureMap = new ConcurrentHashMap<>(); private final Map> cancelJobFutureMap = new ConcurrentHashMap<>(); @@ -136,6 +141,11 @@ public class SparkApplicationActionServiceImpl @Override public void revoke(Long appId) throws ApplicationException { SparkApplication application = getById(appId); + // For HA purposes, if the task is not processed locally, save the Distribution task and return + if (!distributedTaskService.isLocalProcessing(appId)) { + distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.REVOKE); + return; + } ApiAlertException.throwIfNull( application, String.format("The application id=%s not found, revoke failed.", appId)); @@ -161,15 +171,25 @@ public void revoke(Long appId) throws ApplicationException { @Override public void restart(SparkApplication appParam) throws Exception { + // For HA purposes, if the task is not processed locally, save the Distribution task and return + if (!distributedTaskService.isLocalProcessing(appParam.getId())) { + distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.RESTART); + return; + } this.stop(appParam); this.start(appParam, false); } @Override public void forcedStop(Long id) { + SparkApplication application = this.baseMapper.selectApp(id); + // For HA purposes, if the task is not processed locally, save the Distribution task and return + if (!distributedTaskService.isLocalProcessing(id)) { + distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.FORCED_STOP); + return; + } CompletableFuture startFuture = startJobFutureMap.remove(id); CompletableFuture stopFuture = cancelJobFutureMap.remove(id); - SparkApplication application = this.baseMapper.selectApp(id); if (startFuture != null) { startFuture.cancel(true); } @@ -183,6 +203,11 @@ public void forcedStop(Long id) { @Override public void stop(SparkApplication appParam) throws Exception { + // For HA purposes, if the task is not processed locally, save the Distribution task and return + if (!distributedTaskService.isLocalProcessing(appParam.getId())) { + distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.STOP); + return; + } SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING); SparkApplication application = getById(appParam.getId()); application.setState(SparkAppStateEnum.STOPPING.getValue()); @@ -245,6 +270,11 @@ public void stop(SparkApplication appParam) throws Exception { @Override public void start(SparkApplication appParam, boolean auto) throws Exception { + // For HA purposes, if the task is not processed locally, save the Distribution task and return + if (!distributedTaskService.isLocalProcessing(appParam.getId())) { + distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.START); + return; + } // 1) check application final SparkApplication application = getById(appParam.getId()); AssertUtils.notNull(application); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java index 68fdf6b5ac..dfff3440ac 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java @@ -17,16 +17,20 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.console.base.mybatis.entity.BaseEntity; import org.apache.streampark.console.base.util.ConsistentHash; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.bean.FlinkTaskItem; +import org.apache.streampark.console.core.bean.SparkTaskItem; import org.apache.streampark.console.core.entity.DistributedTask; import org.apache.streampark.console.core.entity.FlinkApplication; +import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.enums.DistributedTaskEnum; import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.mapper.DistributedTaskMapper; import org.apache.streampark.console.core.service.DistributedTaskService; import org.apache.streampark.console.core.service.application.FlinkApplicationActionService; +import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; @@ -43,7 +47,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.stream.Collectors; @Slf4j @Service @@ -57,7 +60,10 @@ public class DistributedTaskServiceImpl extends ServiceImpl - * @return List List of tasks that need to be monitored - */ - @Override - public List getMonitoredTaskList(List applications) { - return applications.stream() - .filter(application -> isLocalProcessing(application.getId())) - .collect(Collectors.toList()); - } - /** * This interface handles task redistribution when server nodes are added. * @@ -192,17 +210,25 @@ public boolean isLocalProcessing(Long appId) { * @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT */ @Override - public void saveDistributedTask(FlinkApplication appParam, boolean autoStart, DistributedTaskEnum action) { + public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action) { try { - DistributedTask DistributedTask = getDistributedTaskByApp(appParam, autoStart, action); - this.save(DistributedTask); + DistributedTask distributedTask; + if (appParam instanceof FlinkApplication) { + distributedTask = getDistributedTaskByFlinkApp((FlinkApplication) appParam, autoStart, action); + } else if (appParam instanceof SparkApplication) { + distributedTask = getDistributedTaskBySparkApp((SparkApplication) appParam, autoStart, action); + } else { + log.error("Unsupported application type: {}", appParam.getClass().getName()); + return; + } + this.save(distributedTask); } catch (JsonProcessingException e) { log.error("Failed to save Distributed task: {}", e.getMessage()); } } - public DistributedTask getDistributedTaskByApp(FlinkApplication appParam, boolean autoStart, - DistributedTaskEnum action) throws JsonProcessingException { + public DistributedTask getDistributedTaskByFlinkApp(FlinkApplication appParam, boolean autoStart, + DistributedTaskEnum action) throws JsonProcessingException { FlinkTaskItem flinkTaskItem = new FlinkTaskItem(); flinkTaskItem.setAppId(appParam.getId()); flinkTaskItem.setAutoStart(autoStart); @@ -221,8 +247,25 @@ public DistributedTask getDistributedTaskByApp(FlinkApplication appParam, boolea return distributedTask; } - public FlinkTaskItem getFlinkTaskItem(DistributedTask DistributedTask) throws JsonProcessingException { - return JacksonUtils.read(DistributedTask.getProperties(), FlinkTaskItem.class); + public DistributedTask getDistributedTaskBySparkApp(SparkApplication appParam, boolean autoStart, + DistributedTaskEnum action) throws JsonProcessingException { + SparkTaskItem sparkTaskItem = new SparkTaskItem(); + sparkTaskItem.setAppId(appParam.getId()); + sparkTaskItem.setAutoStart(autoStart); + + DistributedTask distributedTask = new DistributedTask(); + distributedTask.setAction(action); + distributedTask.setEngineType(EngineTypeEnum.SPARK); + distributedTask.setProperties(JacksonUtils.write(sparkTaskItem)); + return distributedTask; + } + + public FlinkTaskItem getFlinkTaskItem(DistributedTask distributedTask) throws JsonProcessingException { + return JacksonUtils.read(distributedTask.getProperties(), FlinkTaskItem.class); + } + + public SparkTaskItem getSparkTaskItem(DistributedTask distributedTask) throws JsonProcessingException { + return JacksonUtils.read(distributedTask.getProperties(), SparkTaskItem.class); } public FlinkApplication getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem) { @@ -238,6 +281,12 @@ public FlinkApplication getAppByFlinkTaskItem(FlinkTaskItem flinkTaskItem) { return appParam; } + public SparkApplication getAppBySparkTaskItem(SparkTaskItem sparkTaskItem) { + SparkApplication appParam = new SparkApplication(); + appParam.setId(sparkTaskItem.getAppId()); + return appParam; + } + public long getConsistentHashSize() { return consistentHash.getSize(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 7b19f853ab..642e09b102 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -180,15 +180,19 @@ public class FlinkAppHttpWatcher { @PostConstruct public void init() { WATCHING_APPS.clear(); - List applications = distributedTaskService.getMonitoredTaskList(applicationManageService.list( + List applications = applicationManageService.list( new LambdaQueryWrapper() .eq(FlinkApplication::getTracking, 1) - .notIn(FlinkApplication::getDeployMode, FlinkDeployMode.getKubernetesMode()))); - applications.forEach( - (app) -> { - WATCHING_APPS.put(app.getId(), app); - STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE); - }); + .notIn(FlinkApplication::getDeployMode, FlinkDeployMode.getKubernetesMode())) + .stream() + .filter(application -> distributedTaskService.isLocalProcessing(application.getId())) + .collect(Collectors.toList()); + + applications.forEach(app -> { + Long appId = app.getId(); + WATCHING_APPS.put(appId, app); + STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE); + }); } @PreDestroy diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index b2c1ed083c..696864687a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -28,6 +28,7 @@ import org.apache.streampark.console.core.metrics.spark.Job; import org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; +import org.apache.streampark.console.core.service.DistributedTaskService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; @@ -64,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Slf4j @Component @@ -84,6 +86,9 @@ public class SparkAppHttpWatcher { @Autowired private SparkEnvService sparkEnvService; + @Autowired + private DistributedTaskService distributedTaskService; + @Autowired private AlertService alertService; @@ -134,16 +139,19 @@ public class SparkAppHttpWatcher { @PostConstruct public void init() { WATCHING_APPS.clear(); - List applications = - applicationManageService.list( - new LambdaQueryWrapper() - .eq(SparkApplication::getTracking, 1) - .ne(SparkApplication::getState, SparkAppStateEnum.LOST.getValue())); - applications.forEach( - (app) -> { - WATCHING_APPS.put(app.getId(), app); - STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE); - }); + List applications = applicationManageService.list( + new LambdaQueryWrapper() + .eq(SparkApplication::getTracking, 1) + .ne(SparkApplication::getState, SparkAppStateEnum.LOST.getValue())) + .stream() + .filter(application -> distributedTaskService.isLocalProcessing(application.getId())) + .collect(Collectors.toList()); + + applications.forEach(app -> { + Long appId = app.getId(); + WATCHING_APPS.put(appId, app); + STARTING_CACHE.put(appId, DEFAULT_FLAG_BYTE); + }); } @PreDestroy diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java index 58a5a15f93..5e1b3adf87 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java @@ -18,8 +18,10 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.console.core.bean.FlinkTaskItem; +import org.apache.streampark.console.core.bean.SparkTaskItem; import org.apache.streampark.console.core.entity.DistributedTask; import org.apache.streampark.console.core.entity.FlinkApplication; +import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.enums.DistributedTaskEnum; import org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl; @@ -57,17 +59,32 @@ void testIsLocalProcessing() { } @Test - void testGetTaskAndApp() { + void testFlinkTaskAndApp() { FlinkApplication application = new FlinkApplication(); application.setId(0L); try { - DistributedTask DistributedTask = - distributionTaskService.getDistributedTaskByApp(application, false, DistributedTaskEnum.START); - FlinkTaskItem flinkTaskItem = distributionTaskService.getFlinkTaskItem(DistributedTask); + DistributedTask distributedTask = + distributionTaskService.getDistributedTaskByFlinkApp(application, false, DistributedTaskEnum.START); + FlinkTaskItem flinkTaskItem = distributionTaskService.getFlinkTaskItem(distributedTask); FlinkApplication newApplication = distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem); assert (application.equals(newApplication)); } catch (JacksonException e) { - log.error("testGetTaskAndApp failed:", e); + log.error("testFlinkTaskAndApp failed:", e); + } + } + + @Test + void testSparkTaskAndApp() { + SparkApplication application = new SparkApplication(); + application.setId(0L); + try { + DistributedTask distributedTask = + distributionTaskService.getDistributedTaskBySparkApp(application, false, DistributedTaskEnum.START); + SparkTaskItem sparkTaskItem = distributionTaskService.getSparkTaskItem(distributedTask); + SparkApplication newApplication = distributionTaskService.getAppBySparkTaskItem(sparkTaskItem); + assert (application.equals(newApplication)); + } catch (JacksonException e) { + log.error("testSparkTaskAndApp failed:", e); } }