From b4165e2986eb1ed5950aa391c28b8d7e6f23ad01 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 27 Sep 2024 19:54:22 +0800 Subject: [PATCH 1/4] [Improve] StopRequest class-name rename to CancelRequest --- .../SparkApplicationActionServiceImpl.java | 28 +++++++++---------- .../streampark/spark/client/SparkClient.scala | 8 +++--- ...{StopRequest.scala => CancelRequest.scala} | 2 +- ...topResponse.scala => CancelResponse.scala} | 2 +- .../spark/client/SparkClientEndpoint.scala | 4 +-- .../spark/client/impl/YarnClient.scala | 20 ++++++------- .../spark/client/trait/SparkClientTrait.scala | 8 +++--- 7 files changed, 36 insertions(+), 36 deletions(-) rename streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/{StopRequest.scala => CancelRequest.scala} (97%) rename streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/{StopResponse.scala => CancelResponse.scala} (94%) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index e6e89b479c..ecd8e05661 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -58,8 +58,8 @@ import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; import org.apache.streampark.spark.client.SparkClient; -import org.apache.streampark.spark.client.bean.StopRequest; -import org.apache.streampark.spark.client.bean.StopResponse; +import org.apache.streampark.spark.client.bean.CancelRequest; +import org.apache.streampark.spark.client.bean.CancelResponse; import org.apache.streampark.spark.client.bean.SubmitRequest; import org.apache.streampark.spark.client.bean.SubmitResponse; @@ -130,9 +130,9 @@ public class SparkApplicationActionServiceImpl @Autowired private ResourceService resourceService; - private final Map> startFutureMap = new ConcurrentHashMap<>(); + private final Map> startJobFutureMap = new ConcurrentHashMap<>(); - private final Map> stopFutureMap = new ConcurrentHashMap<>(); + private final Map> cancelJobFutureMap = new ConcurrentHashMap<>(); @Override public void revoke(Long appId) throws ApplicationException { @@ -168,8 +168,8 @@ public void restart(SparkApplication appParam) throws Exception { @Override public void forcedStop(Long id) { - CompletableFuture startFuture = startFutureMap.remove(id); - CompletableFuture stopFuture = stopFutureMap.remove(id); + CompletableFuture startFuture = startJobFutureMap.remove(id); + CompletableFuture stopFuture = cancelJobFutureMap.remove(id); SparkApplication application = this.baseMapper.selectApp(id); if (startFuture != null) { startFuture.cancel(true); @@ -206,21 +206,21 @@ public void stop(SparkApplication appParam) throws Exception { Map stopProper = new HashMap<>(); - StopRequest stopRequest = - new StopRequest( + CancelRequest stopRequest = + new CancelRequest( application.getId(), sparkEnv.getSparkVersion(), SparkDeployMode.of(application.getDeployMode()), stopProper, application.getAppId()); - CompletableFuture stopFuture = - CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest), executorService); + CompletableFuture stopFuture = + CompletableFuture.supplyAsync(() -> SparkClient.cancel(stopRequest), executorService); - stopFutureMap.put(application.getId(), stopFuture); + cancelJobFutureMap.put(application.getId(), stopFuture); stopFuture.whenComplete( (cancelResponse, throwable) -> { - stopFutureMap.remove(application.getId()); + cancelJobFutureMap.remove(application.getId()); if (throwable != null) { String exception = ExceptionUtils.stringifyException(throwable); applicationLog.setException(exception); @@ -331,11 +331,11 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { CompletableFuture future = CompletableFuture .supplyAsync(() -> SparkClient.submit(submitRequest), executorService); - startFutureMap.put(application.getId(), future); + startJobFutureMap.put(application.getId(), future); future.whenComplete( (response, throwable) -> { // 1) remove Future - startFutureMap.remove(application.getId()); + startJobFutureMap.remove(application.getId()); // 2) exception if (throwable != null) { diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala index 0abc5f4c92..32d09b8568 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala @@ -32,15 +32,15 @@ object SparkClient extends Logger { private[this] val SUBMIT_REQUEST = "org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit" - private[this] val STOP_REQUEST = - "org.apache.streampark.spark.client.bean.StopRequest" -> "stop" + private[this] val CANCEL_REQUEST = + "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel" def submit(submitRequest: SubmitRequest): SubmitResponse = { proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion, SUBMIT_REQUEST) } - def stop(stopRequest: StopRequest): StopResponse = { - proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST) + def cancel(stopRequest: CancelRequest): CancelResponse = { + proxy[CancelResponse](stopRequest, stopRequest.sparkVersion, CANCEL_REQUEST) } private[this] def proxy[T: ClassTag]( diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala similarity index 97% rename from streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala rename to streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala index 6440d9f43f..22a2b4e08b 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala @@ -23,7 +23,7 @@ import org.apache.streampark.common.util.Implicits.JavaMap import javax.annotation.Nullable -case class StopRequest( +case class CancelRequest( id: Long, sparkVersion: SparkVersion, deployMode: SparkDeployMode, diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala similarity index 94% rename from streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala rename to streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala index 42b4805341..7b22a362f1 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala @@ -17,4 +17,4 @@ package org.apache.streampark.spark.client.bean -case class StopResponse(savePoint: String) +case class CancelResponse(savePoint: String) diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala index 104dd9ff68..d68f935967 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala @@ -39,9 +39,9 @@ object SparkClientEndpoint { } } - def stop(stopRequest: StopRequest): StopResponse = { + def cancel(stopRequest: CancelRequest): CancelResponse = { clients.get(stopRequest.deployMode) match { - case Some(client) => client.stop(stopRequest) + case Some(client) => client.cancel(stopRequest) case _ => throw new UnsupportedOperationException( s"Unsupported ${stopRequest.deployMode} spark stop.") diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala index 7eb1ed4b20..33e7b4d5f7 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala @@ -17,7 +17,7 @@ package org.apache.streampark.spark.client.impl -import org.apache.streampark.common.conf.ConfigKeys.{KEY_SPARK_YARN_AM_NODE_LABEL, KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, KEY_SPARK_YARN_QUEUE, KEY_SPARK_YARN_QUEUE_LABEL, KEY_SPARK_YARN_QUEUE_NAME} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.SparkDeployMode import org.apache.streampark.common.util.{HadoopUtils, YarnUtils} import org.apache.streampark.common.util.Implicits._ @@ -37,22 +37,22 @@ object YarnClient extends SparkClientTrait { private lazy val sparkHandles = new ConcurrentHashMap[String, SparkAppHandle]() - override def doStop(stopRequest: StopRequest): StopResponse = { - val sparkAppHandle = sparkHandles.remove(stopRequest.appId) + override def doCancel(cancelRequest: CancelRequest): CancelResponse = { + val sparkAppHandle = sparkHandles.remove(cancelRequest.appId) if (sparkAppHandle != null) { Try(sparkAppHandle.kill()) match { case Success(_) => - logger.info(s"[StreamPark][Spark][YarnClient] spark job: ${stopRequest.appId} is stopped successfully.") - StopResponse(null) + logger.info(s"[StreamPark][Spark][YarnClient] spark job: ${cancelRequest.appId} is stopped successfully.") + CancelResponse(null) case Failure(e) => logger.error("[StreamPark][Spark][YarnClient] sparkAppHandle kill failed. Try kill by yarn", e) - yarnKill(stopRequest.appId) - StopResponse(null) + yarnKill(cancelRequest.appId) + CancelResponse(null) } } else { - logger.warn(s"[StreamPark][Spark][YarnClient] spark job: ${stopRequest.appId} is not existed. Try kill by yarn") - yarnKill(stopRequest.appId) - StopResponse(null) + logger.warn(s"[StreamPark][Spark][YarnClient] spark job: ${cancelRequest.appId} is not existed. Try kill by yarn") + yarnKill(cancelRequest.appId) + CancelResponse(null) } } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala index b13bdef1dc..746de6cd1a 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala @@ -60,24 +60,24 @@ trait SparkClientTrait extends Logger { def setConfig(submitRequest: SubmitRequest): Unit @throws[Exception] - def stop(stopRequest: StopRequest): StopResponse = { + def cancel(stopRequest: CancelRequest): CancelResponse = { logInfo( s""" - |----------------------------------------- spark job stop ---------------------------------- + |----------------------------------------- spark job cancel ---------------------------------- | userSparkHome : ${stopRequest.sparkVersion.sparkHome} | sparkVersion : ${stopRequest.sparkVersion.version} | appId : ${stopRequest.appId} |------------------------------------------------------------------------------------------- |""".stripMargin) - doStop(stopRequest) + doCancel(stopRequest) } @throws[Exception] def doSubmit(submitRequest: SubmitRequest): SubmitResponse @throws[Exception] - def doStop(stopRequest: StopRequest): StopResponse + def doCancel(cancelRequest: CancelRequest): CancelResponse private def prepareConfig(submitRequest: SubmitRequest): Unit = { // 1) filter illegal configuration key From 31b1caf233c79bf2318ccec366f9d699a0b4ef2e Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 27 Sep 2024 21:44:37 +0800 Subject: [PATCH 2/4] [Improve] spark & flink app-id auto-increment Improvements --- .../streampark/common/conf/Workspace.scala | 2 - .../assembly/script/schema/mysql-schema.sql | 40 +++++++++------ .../assembly/script/schema/pgsql-schema.sql | 2 +- .../assembly/script/upgrade/mysql/2.2.0.sql | 18 ++++++- .../core/controller/ConfigController.java | 2 +- ...nkApplicationBuildPipelineController.java} | 4 +- ...r.java => FlinkApplicationController.java} | 6 +-- ...=> FlinkApplicationHistoryController.java} | 2 +- ...oller.java => FlinkCatalogController.java} | 6 +-- .../core/controller/ProxyController.java | 2 +- ...arkApplicationBuildPipelineController.java | 2 +- .../SparkApplicationController.java | 4 +- .../controller/SparkConfigController.java | 2 +- .../core/controller/SparkProxyController.java | 2 +- .../console/core/entity/Application.java | 46 +++++++++++++++++ .../console/core/entity/ApplicationLog.java | 18 +++++-- .../console/core/entity/FlinkApplication.java | 2 +- .../console/core/entity/SparkApplication.java | 6 +-- .../console/core/enums/EngineTypeEnum.java | 4 +- .../core/mapper/ApplicationMapper.java | 26 ++++++++++ .../console/core/runner/EnvInitializer.java | 1 - .../console/core/runner/QuickStartRunner.java | 2 +- ...gService.java => FlinkCatalogService.java} | 4 +- .../AppBuildPipeService.java | 2 +- .../ApplicationLogService.java | 2 +- .../application/ApplicationService.java | 28 +++++++++++ .../FlinkApplicationBackUpService.java | 2 +- .../FlinkApplicationConfigService.java | 2 +- .../SparkAppBuildPipeService.java | 2 +- .../SparkApplicationBackUpService.java | 2 +- .../SparkApplicationConfigService.java | 2 +- .../SparkApplicationLogService.java | 2 +- .../impl/AppBuildPipeServiceImpl.java | 10 ++-- .../impl/ApplicationLogServiceImpl.java | 4 +- .../impl/ApplicationServiceImpl.java | 49 +++++++++++++++++++ .../FlinkApplicationActionServiceImpl.java | 18 +++---- .../FlinkApplicationBackUpServiceImpl.java | 6 +-- .../FlinkApplicationConfigServiceImpl.java | 4 +- .../FlinkApplicationManageServiceImpl.java | 24 +++++++-- .../impl/SparkAppBuildPipeServiceImpl.java | 8 +-- .../SparkApplicationActionServiceImpl.java | 6 +-- .../SparkApplicationBackUpServiceImpl.java | 8 +-- .../SparkApplicationConfigServiceImpl.java | 4 +- .../impl/SparkApplicationLogServiceImpl.java | 4 +- .../SparkApplicationManageServiceImpl.java | 33 ++++++++----- .../service/impl/FlinkCatalogServiceImpl.java | 4 +- .../impl/FlinkSavepointServiceImpl.java | 8 +-- .../service/impl/FlinkSqlServiceImpl.java | 2 +- .../core/service/impl/ProxyServiceImpl.java | 4 +- .../service/impl/SparkSqlServiceImpl.java | 2 +- .../core/task/ApplicationBackUpCleanTask.java | 2 +- .../core/watcher/SparkAppHttpWatcher.java | 2 +- .../src/main/resources/db/data-h2.sql | 19 +++++-- .../src/main/resources/db/schema-h2.sql | 37 +++++++++----- .../mapper/core/ApplicationMapper.xml | 21 ++++++++ .../core/service/CatalogServiceTest.java | 2 +- .../core/service/SavepointServiceTest.java | 1 + 57 files changed, 393 insertions(+), 136 deletions(-) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{ApplicationBuildPipelineController.java => FlinkApplicationBuildPipelineController.java} (95%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{ApplicationController.java => FlinkApplicationController.java} (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{ApplicationHistoryController.java => FlinkApplicationHistoryController.java} (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/{CatalogController.java => FlinkCatalogController.java} (95%) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{CatalogService.java => FlinkCatalogService.java} (94%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/AppBuildPipeService.java (97%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/ApplicationLogService.java (96%) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/FlinkApplicationBackUpService.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/FlinkApplicationConfigService.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/SparkAppBuildPipeService.java (97%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/SparkApplicationBackUpService.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/SparkApplicationConfigService.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/SparkApplicationLogService.java (96%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/AppBuildPipeServiceImpl.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/ApplicationLogServiceImpl.java (94%) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/FlinkApplicationBackUpServiceImpl.java (97%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/FlinkApplicationConfigServiceImpl.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/SparkAppBuildPipeServiceImpl.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/SparkApplicationBackUpServiceImpl.java (97%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/SparkApplicationConfigServiceImpl.java (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{ => application}/impl/SparkApplicationLogServiceImpl.java (94%) create mode 100644 streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala index c8ce1dd95c..cc9161f5e5 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala @@ -109,8 +109,6 @@ case class Workspace(storageType: StorageType) { lazy val APP_WORKSPACE = s"$WORKSPACE/workspace" - lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace" - lazy val APP_FLINK = s"$WORKSPACE/flink" lazy val APP_SPARK = s"$WORKSPACE/spark" diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index 09c6620c66..db36a2a2a6 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -22,27 +22,22 @@ set names utf8mb4; set foreign_key_checks = 0; -- ---------------------------- --- table structure for t_flink_app_backup +-- Table structure for t_app -- ---------------------------- -drop table if exists `t_flink_app_backup`; -create table `t_flink_app_backup` ( - `id` bigint not null auto_increment, - `app_id` bigint default null, - `sql_id` bigint default null, - `config_id` bigint default null, - `version` int default null, - `path` varchar(128) collate utf8mb4_general_ci default null, - `description` varchar(255) collate utf8mb4_general_ci default null, - `create_time` datetime default null comment 'create time', - primary key (`id`) using btree -) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; +create table if not exists `t_app` ( +`id` bigint not null, +`job_type` tinyint default null, +`create_time` datetime default null comment 'create time', +`modify_time` datetime default null comment 'modify time', +primary key(`id`) +); -- ---------------------------- -- Table structure for t_flink_app -- ---------------------------- drop table if exists `t_flink_app`; create table `t_flink_app` ( - `id` bigint not null auto_increment, + `id` bigint not null, `team_id` bigint not null, `job_type` tinyint default null, `deploy_mode` tinyint default null, @@ -110,6 +105,23 @@ create table `t_flink_app` ( ) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; +-- ---------------------------- +-- table structure for t_flink_app_backup +-- ---------------------------- +drop table if exists `t_flink_app_backup`; +create table `t_flink_app_backup` ( +`id` bigint not null, +`app_id` bigint default null, +`sql_id` bigint default null, +`config_id` bigint default null, +`version` int default null, +`path` varchar(128) collate utf8mb4_general_ci default null, +`description` varchar(255) collate utf8mb4_general_ci default null, +`create_time` datetime default null comment 'create time', +primary key (`id`) using btree +) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; + + -- ---------------------------- -- table structure for t_flink_config -- ---------------------------- diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql index 6b2df02a11..90548e7da0 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql @@ -192,7 +192,7 @@ create sequence "public"."streampark_t_flink_app_id_seq" increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; create table "public"."t_flink_app" ( - "id" int8 not null default nextval('streampark_t_flink_app_id_seq'::regclass), + "id" int8 not null, "team_id" int8, "job_type" int2, "deploy_mode" int2, diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql index 6ba50d2811..a59f7c81b2 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql @@ -20,13 +20,27 @@ use streampark; set names utf8mb4; set foreign_key_checks = 0; -alter table t_app_backup rename to t_flink_app_backup; + +-- ---------------------------- +-- Table structure for t_app +-- ---------------------------- +create table if not exists `t_app` ( +`id` bigint not null, +`job_type` tinyint default null, +`create_time` datetime default null comment 'create time', +`modify_time` datetime default null comment 'modify time', +primary key(`id`) +); + alter table `t_flink_app` - add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null, + modify column `id` not null; +add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null, -- modify_time change with duration #3188 modify column `modify_time` datetime not null default current_timestamp comment 'modify time'; +alter table t_app_backup rename to t_flink_app_backup; + alter table `t_flink_log` add column `user_id` bigint default null comment 'operator user id'; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java index dd63d21aa5..7622e92f28 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkApplicationConfig; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.shiro.authz.annotation.RequiresPermissions; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java similarity index 95% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java index ada06d148c..211b9c3aa7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java @@ -21,7 +21,7 @@ import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail; import org.apache.streampark.console.core.entity.AppBuildPipeline; -import org.apache.streampark.console.core.service.AppBuildPipeService; +import org.apache.streampark.console.core.service.application.AppBuildPipeService; import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot; import org.apache.streampark.flink.packer.pipeline.PipelineTypeEnum; @@ -42,7 +42,7 @@ @Validated @RestController @RequestMapping("flink/pipe") -public class ApplicationBuildPipelineController { +public class FlinkApplicationBuildPipelineController { @Autowired private AppBuildPipeService appBuildPipeService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java index 3d92527313..8474cf698f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java @@ -28,10 +28,10 @@ import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkApplicationBackUp; import org.apache.streampark.console.core.enums.AppExistsStateEnum; -import org.apache.streampark.console.core.service.ApplicationLogService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.ResourceService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationActionService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; @@ -55,7 +55,7 @@ @Validated @RestController @RequestMapping("flink/app") -public class ApplicationController { +public class FlinkApplicationController { @Autowired private FlinkApplicationManageService applicationManageService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java index 04cea4edaa..b4e93c4028 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java @@ -37,7 +37,7 @@ @Validated @RestController @RequestMapping("flink/history") -public class ApplicationHistoryController { +public class FlinkApplicationHistoryController { @Autowired private FlinkApplicationInfoService applicationInfoService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java similarity index 95% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java index 78847211f6..2817c2ecce 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java @@ -21,7 +21,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.bean.FlinkCatalogParams; -import org.apache.streampark.console.core.service.CatalogService; +import org.apache.streampark.console.core.service.FlinkCatalogService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -40,10 +40,10 @@ @Validated @RestController @RequestMapping("flink/catalog") -public class CatalogController { +public class FlinkCatalogController { @Autowired - CatalogService catalogService; + FlinkCatalogService catalogService; @Permission(team = "#catalog.teamId") @PostMapping("create") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java index 2b586a1a64..d5504d5aca 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java @@ -21,8 +21,8 @@ import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.enums.UserTypeEnum; -import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.ProxyService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.system.entity.Member; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java index 260a55d533..dfaadf8c6b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java @@ -20,7 +20,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.entity.AppBuildPipeline; -import org.apache.streampark.console.core.service.SparkAppBuildPipeService; +import org.apache.streampark.console.core.service.application.SparkAppBuildPipeService; import org.apache.shiro.authz.annotation.RequiresPermissions; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java index bc30436991..2e181883de 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java @@ -27,11 +27,11 @@ import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.enums.AppExistsStateEnum; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.ResourceService; -import org.apache.streampark.console.core.service.SparkApplicationLogService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; +import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.shiro.authz.annotation.RequiresPermissions; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java index cc58bb2169..7d76ca3565 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationConfig; -import org.apache.streampark.console.core.service.SparkApplicationConfigService; +import org.apache.streampark.console.core.service.application.SparkApplicationConfigService; import org.apache.shiro.authz.annotation.RequiresPermissions; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java index c7ad830625..fa68395eab 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java @@ -22,7 +22,7 @@ import org.apache.streampark.console.core.entity.SparkApplicationLog; import org.apache.streampark.console.core.enums.UserTypeEnum; import org.apache.streampark.console.core.service.ProxyService; -import org.apache.streampark.console.core.service.SparkApplicationLogService; +import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.system.entity.Member; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java new file mode 100644 index 0000000000..6239fa2a80 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.util.Date; + +@Data +@TableName("t_app") +@Slf4j +public class Application implements Serializable { + + @TableId(type = IdType.AUTO) + private Long id; + /** + * 1: flink job + * 2: spark job + */ + private Integer jobType; + + private Date createTime; + + private Date modifyTime; + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java index a6e1ed92a2..78afd59a19 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java @@ -32,16 +32,28 @@ public class ApplicationLog { @TableId(type = IdType.AUTO) private Long id; + /** appId */ private Long appId; - /** applicationId */ - private String yarnAppId; + + /** + * 1: flink + * 2: spark + */ + private Integer jobType; + + /** clusterId */ + private String clusterId; + /** The address of the jobmanager, that is, the direct access address of the Flink web UI */ - private String jobManagerUrl; + private String trackingUrl; + /** start status */ private Boolean success; + /** option name */ private Integer optionName; + /** option time */ private Date optionTime; /** exception at the start */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java index 12f40f4085..382f71fef1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java @@ -66,7 +66,7 @@ @Slf4j public class FlinkApplication implements Serializable { - @TableId(type = IdType.AUTO) + @TableId(type = IdType.INPUT) private Long id; private Long teamId; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index 8cc8abccf1..da64214016 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -59,7 +59,7 @@ @Slf4j public class SparkApplication extends BaseEntity { - @TableId(type = IdType.AUTO) + @TableId(type = IdType.INPUT) private Long id; private Long teamId; @@ -336,14 +336,14 @@ public String getDistHome() { @JsonIgnore public String getLocalAppHome() { - String path = String.format("%s/%s", Workspace.local().SPARK_APP_WORKSPACE(), id.toString()); + String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString()); log.info("local appHome:{}", path); return path; } @JsonIgnore public String getRemoteAppHome() { - String path = String.format("%s/%s", Workspace.remote().SPARK_APP_WORKSPACE(), id.toString()); + String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), id.toString()); log.info("remote appHome:{}", path); return path; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java index 3b15ce6cb5..2033e24719 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java @@ -27,10 +27,10 @@ public enum EngineTypeEnum { /** Apache Flink: activated by default */ - FLINK(0), + FLINK(1), /** Apache Spark */ - SPARK(1); + SPARK(2); @EnumValue private final int code; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java new file mode 100644 index 0000000000..48820fe3b5 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.Application; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface ApplicationMapper extends BaseMapper { + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index c2d026badd..4e5ba38ac3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -147,7 +147,6 @@ private void prepareWorkspace( Arrays.asList( workspace.APP_UPLOADS(), workspace.APP_WORKSPACE(), - workspace.SPARK_APP_WORKSPACE(), workspace.APP_BACKUPS(), workspace.APP_SAVEPOINTS(), workspace.APP_PYTHON(), diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java index afd50f1adb..0e04a1ab93 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java @@ -24,10 +24,10 @@ import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.FlinkSql; -import org.apache.streampark.console.core.service.AppBuildPipeService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; +import org.apache.streampark.console.core.service.application.AppBuildPipeService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import lombok.extern.slf4j.Slf4j; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java similarity index 94% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java index c19d467b41..160ae952fb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java @@ -24,8 +24,8 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; -/** This interface is use to managed catalog */ -public interface CatalogService extends IService { +/** This interface is used to managed catalog */ +public interface FlinkCatalogService extends IService { /** * Create Catalog diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java similarity index 97% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java index c73c63ca01..a4cdacc3d7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java similarity index 96% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java index cb8fb1eac1..17a068b3a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.core.entity.ApplicationLog; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java new file mode 100644 index 0000000000..e6404ad200 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.application; + +import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.enums.EngineTypeEnum; + +import com.baomidou.mybatisplus.extension.service.IService; + +public interface ApplicationService extends IService { + + Application create(EngineTypeEnum engineTypeEnum); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java index cafe8376f6..29cc0de3cb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.InternalException; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java index 7850b2aa58..a13843a2e3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.core.entity.FlinkApplication; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java similarity index 97% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java index ee2d2ae64c..25ea3cb11d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java index 1c3f2f92ee..5b46fc53d8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.InternalException; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java index b289910d47..7745b97a3c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.core.entity.SparkApplication; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java similarity index 96% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java index 6edee09ac8..7c3687cbc8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service; +package org.apache.streampark.console.core.service.application; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.core.entity.SparkApplicationLog; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java index bc17e47b03..3434e67db0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service.impl; +package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.constants.Constants; @@ -45,16 +45,16 @@ import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.enums.ResourceTypeEnum; import org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper; -import org.apache.streampark.console.core.service.AppBuildPipeService; -import org.apache.streampark.console.core.service.ApplicationLogService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.MessageService; import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.SettingService; +import org.apache.streampark.console.core.service.application.AppBuildPipeService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationActionService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java similarity index 94% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java index 3ac1563b4e..feef8cc0ce 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service.impl; +package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; import org.apache.streampark.console.core.entity.ApplicationLog; import org.apache.streampark.console.core.mapper.ApplicationLogMapper; -import org.apache.streampark.console.core.service.ApplicationLogService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java new file mode 100644 index 0000000000..866ceab92e --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.application.impl; + +import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.enums.EngineTypeEnum; +import org.apache.streampark.console.core.mapper.ApplicationMapper; +import org.apache.streampark.console.core.service.application.ApplicationService; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Date; + +@Slf4j +@Service +@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) +public class ApplicationServiceImpl extends ServiceImpl + implements + ApplicationService { + + @Override + public Application create(EngineTypeEnum engineTypeEnum) { + Application application = new Application(); + application.setJobType(engineTypeEnum.getCode()); + application.setCreateTime(new Date()); + application.setModifyTime(new Date()); + save(application); + return application; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index 451fee22ce..d7a1efa851 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -54,11 +54,7 @@ import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationMapper; -import org.apache.streampark.console.core.service.AppBuildPipeService; -import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.DistributedTaskService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; @@ -66,7 +62,11 @@ import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; +import org.apache.streampark.console.core.service.application.AppBuildPipeService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; import org.apache.streampark.console.core.service.application.FlinkApplicationActionService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; @@ -272,9 +272,9 @@ public void cancel(FlinkApplication appParam) throws Exception { ApplicationLog applicationLog = new ApplicationLog(); applicationLog.setOptionName(OperationEnum.CANCEL.getValue()); applicationLog.setAppId(application.getId()); - applicationLog.setJobManagerUrl(application.getJobManagerUrl()); + applicationLog.setTrackingUrl(application.getJobManagerUrl()); applicationLog.setOptionTime(new Date()); - applicationLog.setYarnAppId(application.getClusterId()); + applicationLog.setClusterId(application.getClusterId()); applicationLog.setUserId(ServiceHelper.getUserId()); if (appParam.getRestoreOrTriggerSavepoint()) { @@ -546,14 +546,14 @@ private void processForSuccess( if (FlinkDeployMode.isYarnMode(application.getDeployMode())) { application.setClusterId(response.clusterId()); - applicationLog.setYarnAppId(response.clusterId()); + applicationLog.setClusterId(response.clusterId()); } if (StringUtils.isNoneEmpty(response.jobManagerUrl())) { application.setJobManagerUrl(response.jobManagerUrl()); - applicationLog.setJobManagerUrl(response.jobManagerUrl()); + applicationLog.setTrackingUrl(response.jobManagerUrl()); } - applicationLog.setYarnAppId(response.clusterId()); + applicationLog.setClusterId(response.clusterId()); application.setStartTime(new Date()); application.setEndTime(null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java similarity index 97% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java index fdbc9a7940..56ab42b54c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service.impl; +package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.console.base.domain.RestRequest; @@ -30,9 +30,9 @@ import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationBackUpMapper; import org.apache.streampark.console.core.service.EffectiveService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.FlinkSqlService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java index e9abadf0af..f5c7d52384 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.console.core.service.impl; +package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.Utils; @@ -28,7 +28,7 @@ import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationConfigMapper; import org.apache.streampark.console.core.service.EffectiveService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index 974cfeb2c2..4dd265b6ec 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.AppControl; +import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkApplicationConfig; import org.apache.streampark.console.core.entity.FlinkCluster; @@ -36,15 +37,12 @@ import org.apache.streampark.console.core.entity.Resource; import org.apache.streampark.console.core.enums.CandidateTypeEnum; import org.apache.streampark.console.core.enums.ChangeTypeEnum; +import org.apache.streampark.console.core.enums.EngineTypeEnum; import org.apache.streampark.console.core.enums.FlinkAppStateEnum; import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationMapper; -import org.apache.streampark.console.core.service.AppBuildPipeService; -import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.EffectiveService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.ProjectService; @@ -52,6 +50,11 @@ import org.apache.streampark.console.core.service.SavepointService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.YarnQueueService; +import org.apache.streampark.console.core.service.application.AppBuildPipeService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; +import org.apache.streampark.console.core.service.application.ApplicationService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; @@ -104,6 +107,9 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl implements - CatalogService { + FlinkCatalogService { private static final String CATALOG_REGEX = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?$"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index e075e5da42..4191d316de 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -35,11 +35,11 @@ import org.apache.streampark.console.core.enums.OperationEnum; import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.mapper.FlinkSavepointMapper; -import org.apache.streampark.console.core.service.ApplicationLogService; -import org.apache.streampark.console.core.service.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.SavepointService; +import org.apache.streampark.console.core.service.application.ApplicationLogService; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.util.ServiceHelper; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; @@ -191,9 +191,9 @@ private ApplicationLog getApplicationLog(FlinkApplication application) { ApplicationLog applicationLog = new ApplicationLog(); applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue()); applicationLog.setAppId(application.getId()); - applicationLog.setJobManagerUrl(application.getJobManagerUrl()); + applicationLog.setTrackingUrl(application.getJobManagerUrl()); applicationLog.setOptionTime(new Date()); - applicationLog.setYarnAppId(application.getClusterId()); + applicationLog.setClusterId(application.getClusterId()); applicationLog.setUserId(ServiceHelper.getUserId()); return applicationLog; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index c95cb3b7c9..faf8c84627 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -29,9 +29,9 @@ import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.mapper.FlinkSqlMapper; import org.apache.streampark.console.core.service.EffectiveService; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import org.apache.streampark.flink.core.FlinkSqlValidationResult; import org.apache.streampark.flink.proxy.FlinkShimsProxy; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java index 590b373c29..4b492fc087 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java @@ -131,7 +131,7 @@ public ResponseEntity proxyFlink(HttpServletRequest request, FlinkApplication @Override public ResponseEntity proxyYarn(HttpServletRequest request, ApplicationLog log) throws Exception { ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); - String yarnId = log.getYarnAppId(); + String yarnId = log.getClusterId(); if (StringUtils.isBlank(yarnId)) { return builder.body("The yarn application id is null."); } @@ -158,7 +158,7 @@ public ResponseEntity proxyYarn(HttpServletRequest request, SparkApplicationL public ResponseEntity proxyHistory(HttpServletRequest request, ApplicationLog log) throws Exception { ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE); - String url = log.getJobManagerUrl(); + String url = log.getTrackingUrl(); if (StringUtils.isBlank(url)) { return builder.body("The jobManager url is null."); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java index 0468268800..cf157007f8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java @@ -28,10 +28,10 @@ import org.apache.streampark.console.core.enums.CandidateTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.mapper.SparkSqlMapper; -import org.apache.streampark.console.core.service.SparkApplicationBackUpService; import org.apache.streampark.console.core.service.SparkEffectiveService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.SparkSqlService; +import org.apache.streampark.console.core.service.application.SparkApplicationBackUpService; import org.apache.streampark.spark.client.proxy.SparkShimsProxy; import org.apache.streampark.spark.core.util.SparkSqlValidationResult; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java index 22fa38e0f3..b75848b661 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.task; import org.apache.streampark.console.core.entity.FlinkApplicationBackUp; -import org.apache.streampark.console.core.service.FlinkApplicationBackUpService; +import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index 8e7c154765..b2c1ed083c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -28,11 +28,11 @@ import org.apache.streampark.console.core.metrics.spark.Job; import org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; -import org.apache.streampark.console.core.service.SparkApplicationLogService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.SparkApplicationActionService; import org.apache.streampark.console.core.service.application.SparkApplicationInfoService; +import org.apache.streampark.console.core.service.application.SparkApplicationLogService; import org.apache.streampark.console.core.service.application.SparkApplicationManageService; import org.apache.streampark.console.core.utils.AlertTemplateUtils; diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index f8459eee89..3c082209ad 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -21,6 +21,12 @@ insert into `t_team` values (100000, 'default', 'The default team', now(), now()); insert into `t_team` values (100001, 'test', 'The test team', now(), now()); +-- ---------------------------- +-- Records of flink-app +-- ---------------------------- +INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) +VALUES (100000, 1, now(), now()); + -- ---------------------------- -- Records of t_flink_app -- ---------------------------- @@ -45,6 +51,13 @@ insert into `t_flink_project` values (100000, 100000, 'streampark-quickstart', ' -- ---------------------------- insert into `t_flink_sql` values (100000, 100000, 'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw', null, null, 1, 1, now()); + +-- ---------------------------- +-- Records of spark-app +-- ---------------------------- +INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) +VALUES (100001, 2, now(), now()); + -- ---------------------------- -- Records of t_spark_app -- ---------------------------- @@ -52,17 +65,17 @@ insert into `t_spark_app` ( `id`, `team_id`, `job_type`, `app_type`, `app_name`, `deploy_mode`, `resource_from`, `main_class`, `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`, `option_state`, `user_id`, `description`, `tracking`, `release`, `build`, `create_time`, `modify_time`, `tags`) -values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test'); +values (100001, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test'); -- ---------------------------- -- Records of t_spark_effective -- ---------------------------- -insert into `t_spark_effective` values (100000, 100000, 4, 100000, now()); +insert into `t_spark_effective` values (100000, 100001, 4, 100000, now()); -- ---------------------------- -- Records of t_spark_sql -- ---------------------------- -insert into `t_spark_sql` values (100000, 100000, 'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB', null, null, 1, 1, now()); +insert into `t_spark_sql` values (100000, 100001, 'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB', null, null, 1, 1, now()); -- ---------------------------- -- Records of t_menu diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index b427e28d1c..3a410d8a85 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -16,19 +16,14 @@ */ -- ---------------------------- --- Table structure for t_flink_app_backup +-- Table structure for t_app -- ---------------------------- -create table if not exists `t_flink_app_backup` ( - `id` bigint generated by default as identity not null, - `app_id` bigint default null, - `sql_id` bigint default null, - `config_id` bigint default null, - `version` int default null, - `path` varchar(128) default null, - `description` varchar(255) default null, - `create_time` datetime default null comment 'create time', - `modify_time` datetime default null comment 'modify time', - primary key(`id`) +create table if not exists `t_app` ( +`id` bigint generated by default as identity not null, +`job_type` tinyint default null, +`create_time` datetime default null comment 'create time', +`modify_time` datetime default null comment 'modify time', +primary key(`id`) ); -- ---------------------------- @@ -99,6 +94,24 @@ create table if not exists `t_flink_app` ( primary key(`id`) ); + +-- ---------------------------- +-- Table structure for t_flink_app_backup +-- ---------------------------- +create table if not exists `t_flink_app_backup` ( +`id` bigint generated by default as identity not null, +`app_id` bigint default null, +`sql_id` bigint default null, +`config_id` bigint default null, +`version` int default null, +`path` varchar(128) default null, +`description` varchar(255) default null, +`create_time` datetime default null comment 'create time', +`modify_time` datetime default null comment 'modify time', +primary key(`id`) +); + + -- ---------------------------- -- Table structure for t_flink_config -- ---------------------------- diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml new file mode 100644 index 0000000000..85569bf7f2 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -0,0 +1,21 @@ + + + + + + diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java index 23decd3bd7..7a6129d52b 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java @@ -35,7 +35,7 @@ public class CatalogServiceTest extends SpringUnitTestBase { @Autowired - private CatalogService catalogService; + private FlinkCatalogService catalogService; @AfterEach void cleanTestRecordsInDatabase() { diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java index 833faa4ab1..1f29b10a45 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java @@ -28,6 +28,7 @@ import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; +import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; import org.apache.streampark.console.core.service.impl.FlinkSavepointServiceImpl; From 92601163026b799c6931e31d8d735e3bd8ea3e41 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 27 Sep 2024 22:50:39 +0800 Subject: [PATCH 3/4] [Improve] t_flink_log table rename to t_app_log --- .../assembly/script/schema/mysql-schema.sql | 10 +++---- .../assembly/script/schema/pgsql-schema.sql | 18 ++++++------- .../assembly/script/upgrade/mysql/2.2.0.sql | 7 ++++- .../console/core/entity/ApplicationLog.java | 2 +- .../FlinkApplicationActionServiceImpl.java | 26 +++++++++---------- .../src/main/resources/db/schema-h2.sql | 9 ++++--- 6 files changed, 39 insertions(+), 33 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index db36a2a2a6..e054d46c7e 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -173,14 +173,14 @@ create table `t_flink_env` ( -- ---------------------------- --- table structure for t_flink_log +-- table structure for t_app_log -- ---------------------------- -drop table if exists `t_flink_log`; -create table `t_flink_log` ( +drop table if exists `t_app_log`; +create table `t_app_log` ( `id` bigint not null auto_increment, `app_id` bigint default null, - `yarn_app_id` varchar(64) collate utf8mb4_general_ci default null, - `job_manager_url` varchar(255) collate utf8mb4_general_ci default null, + `cluster_id` varchar(64) collate utf8mb4_general_ci default null, + `tracking_url` varchar(255) collate utf8mb4_general_ci default null, `success` tinyint default null, `exception` text collate utf8mb4_general_ci, `option_time` datetime default null, diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql index 90548e7da0..dc1bf538a1 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql @@ -39,7 +39,7 @@ drop table if exists "public"."t_app_build_pipe"; drop table if exists "public"."t_flink_app_backup"; drop table if exists "public"."t_alert_config"; drop table if exists "public"."t_access_token"; -drop table if exists "public"."t_flink_log"; +drop table if exists "public"."t_app_log"; drop table if exists "public"."t_team"; drop table if exists "public"."t_variable"; drop table if exists "public"."t_external_link"; @@ -66,7 +66,7 @@ drop sequence if exists "public"."streampark_t_distributed_task_id_seq"; drop sequence if exists "public"."streampark_t_flink_app_backup_id_seq"; drop sequence if exists "public"."streampark_t_alert_config_id_seq"; drop sequence if exists "public"."streampark_t_access_token_id_seq"; -drop sequence if exists "public"."streampark_t_flink_log_id_seq"; +drop sequence if exists "public"."streampark_t_app_log_id_seq"; drop sequence if exists "public"."streampark_t_team_id_seq"; drop sequence if exists "public"."streampark_t_variable_id_seq"; drop sequence if exists "public"."streampark_t_external_link_id_seq"; @@ -403,16 +403,16 @@ create index "un_env_name" on "public"."t_flink_env" using btree ( -- ---------------------------- --- table structure for t_flink_log +-- table structure for t_app_log -- ---------------------------- -create sequence "public"."streampark_t_flink_log_id_seq" +create sequence "public"."streampark_t_app_log_id_seq" increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; -create table "public"."t_flink_log" ( - "id" int8 not null default nextval('streampark_t_flink_log_id_seq'::regclass), +create table "public"."t_app_log" ( + "id" int8 not null default nextval('streampark_t_app_log_id_seq'::regclass), "app_id" int8, - "yarn_app_id" varchar(64) collate "pg_catalog"."default", - "job_manager_url" varchar(255) collate "pg_catalog"."default", + "cluster_id" varchar(64) collate "pg_catalog"."default", + "tracking_url" varchar(255) collate "pg_catalog"."default", "success" boolean, "exception" text collate "pg_catalog"."default", "option_time" timestamp(6), @@ -420,7 +420,7 @@ create table "public"."t_flink_log" ( "user_id" int8 ) ; -alter table "public"."t_flink_log" add constraint "t_flink_log_pkey" primary key ("id"); +alter table "public"."t_app_log" add constraint "t_app_log_pkey" primary key ("id"); -- ---------------------------- diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql index a59f7c81b2..997146f813 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql @@ -41,7 +41,12 @@ add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null, alter table t_app_backup rename to t_flink_app_backup; -alter table `t_flink_log` +alter table t_flink_log rename to t_app_log; + +alter table `t_app_log` + change column `yarn_app_id` `cluster_id` varchar(64) default null, + change column `job_manager_url` `tracking_url` varchar(255) default null, + add column `job_type` tinyint default null, add column `user_id` bigint default null comment 'operator user id'; alter table `t_flink_project` diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java index 78afd59a19..574a320f17 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java @@ -26,7 +26,7 @@ import java.util.Date; @Data -@TableName("t_flink_log") +@TableName("t_app_log") @Slf4j public class ApplicationLog { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index d7a1efa851..582d803166 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -528,44 +528,44 @@ private void processForSuccess( FlinkApplication appParam, SubmitResponse response, ApplicationLog applicationLog, - FlinkApplication application) { + FlinkApplication flinkApplication) { applicationLog.setSuccess(true); if (response.flinkConfig() != null) { String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); if (jmMemory != null) { - application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); + flinkApplication.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); } String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); if (tmMemory != null) { - application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); + flinkApplication.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); } } if (StringUtils.isNoneEmpty(response.jobId())) { - application.setJobId(response.jobId()); + flinkApplication.setJobId(response.jobId()); } - if (FlinkDeployMode.isYarnMode(application.getDeployMode())) { - application.setClusterId(response.clusterId()); + if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())) { + flinkApplication.setClusterId(response.clusterId()); applicationLog.setClusterId(response.clusterId()); } if (StringUtils.isNoneEmpty(response.jobManagerUrl())) { - application.setJobManagerUrl(response.jobManagerUrl()); + flinkApplication.setJobManagerUrl(response.jobManagerUrl()); applicationLog.setTrackingUrl(response.jobManagerUrl()); } applicationLog.setClusterId(response.clusterId()); - application.setStartTime(new Date()); - application.setEndTime(null); + flinkApplication.setStartTime(new Date()); + flinkApplication.setEndTime(null); // if start completed, will be added task to tracking queue - if (application.isKubernetesModeJob()) { - processForK8sApp(application, applicationLog); + if (flinkApplication.isKubernetesModeJob()) { + processForK8sApp(flinkApplication, applicationLog); } else { FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); - FlinkAppHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.doWatching(flinkApplication); } // update app - updateById(application); + updateById(flinkApplication); // save log applicationLogService.save(applicationLog); } diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index 3a410d8a85..dd2de39867 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -158,13 +158,14 @@ create table if not exists `t_flink_env` ( -- ---------------------------- --- Table structure for t_flink_log +-- Table structure for t_app_log -- ---------------------------- -create table if not exists `t_flink_log` ( +create table if not exists `t_app_log` ( `id` bigint generated by default as identity not null, `app_id` bigint default null, - `yarn_app_id` varchar(64) default null, - `job_manager_url` varchar(255) default null, + `job_type` tinyint default null, + `cluster_id` varchar(64) default null, + `tracking_url` varchar(255) default null, `success` tinyint default null, `exception` text , `option_time` datetime default null, From ede5bac94858dadd8ef155d4411832a95947efaf Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 27 Sep 2024 23:32:27 +0800 Subject: [PATCH 4/4] [Improve] Effective entity rename to FlinkEffective --- .../{Effective.java => FlinkEffective.java} | 2 +- ...eMapper.java => FlinkEffectiveMapper.java} | 4 +- ...ervice.java => FlinkEffectiveService.java} | 6 +-- .../FlinkApplicationBackUpServiceImpl.java | 4 +- .../FlinkApplicationConfigServiceImpl.java | 4 +- .../FlinkApplicationManageServiceImpl.java | 4 +- ...pl.java => FlinkEffectiveServiceImpl.java} | 43 ++++++++++--------- .../service/impl/FlinkSqlServiceImpl.java | 4 +- .../resources/mapper/core/EffectiveMapper.xml | 2 +- ...=> FlinkApplicationManageServiceTest.java} | 2 +- ...Test.java => FlinkCatalogServiceTest.java} | 2 +- ...st.java => FlinkSavepointServiceTest.java} | 8 ++-- 12 files changed, 43 insertions(+), 42 deletions(-) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/{Effective.java => FlinkEffective.java} (98%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/{EffectiveMapper.java => FlinkEffectiveMapper.java} (86%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/{EffectiveService.java => FlinkEffectiveService.java} (85%) rename streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/{EffectiveServiceImpl.java => FlinkEffectiveServiceImpl.java} (59%) rename streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/{ApplicationManageServiceTest.java => FlinkApplicationManageServiceTest.java} (98%) rename streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/{CatalogServiceTest.java => FlinkCatalogServiceTest.java} (98%) rename streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/{SavepointServiceTest.java => FlinkSavepointServiceTest.java} (97%) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java similarity index 98% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java index b379bd3556..bfb806441f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java @@ -30,7 +30,7 @@ @Data @TableName("t_flink_effective") @Slf4j -public class Effective { +public class FlinkEffective { @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java similarity index 86% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java index b3815425f4..b16e6fa839 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java @@ -17,9 +17,9 @@ package org.apache.streampark.console.core.mapper; -import org.apache.streampark.console.core.entity.Effective; +import org.apache.streampark.console.core.entity.FlinkEffective; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -public interface EffectiveMapper extends BaseMapper { +public interface FlinkEffectiveMapper extends BaseMapper { } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java similarity index 85% rename from streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java rename to streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java index 46d87db3a5..1cd38cb1c4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java @@ -17,16 +17,16 @@ package org.apache.streampark.console.core.service; -import org.apache.streampark.console.core.entity.Effective; +import org.apache.streampark.console.core.entity.FlinkEffective; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import com.baomidou.mybatisplus.extension.service.IService; -public interface EffectiveService extends IService { +public interface FlinkEffectiveService extends IService { void remove(Long appId, EffectiveTypeEnum config); - Effective get(Long appId, EffectiveTypeEnum config); + FlinkEffective get(Long appId, EffectiveTypeEnum config); void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java index 56ab42b54c..b3c0982d38 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java @@ -29,7 +29,7 @@ import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationBackUpMapper; -import org.apache.streampark.console.core.service.EffectiveService; +import org.apache.streampark.console.core.service.FlinkEffectiveService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; @@ -62,7 +62,7 @@ public class FlinkApplicationBackUpServiceImpl private FlinkApplicationConfigService configService; @Autowired - private EffectiveService effectiveService; + private FlinkEffectiveService effectiveService; @Autowired private FlinkSqlService flinkSqlService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java index f5c7d52384..2f8a02dbce 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java @@ -27,7 +27,7 @@ import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationConfigMapper; -import org.apache.streampark.console.core.service.EffectiveService; +import org.apache.streampark.console.core.service.FlinkEffectiveService; import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -64,7 +64,7 @@ public class FlinkApplicationConfigServiceImpl private ResourceLoader resourceLoader; @Autowired - private EffectiveService effectiveService; + private FlinkEffectiveService effectiveService; @Override public synchronized void create(FlinkApplication appParam, Boolean latest) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index 4dd265b6ec..684b04dd56 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -42,8 +42,8 @@ import org.apache.streampark.console.core.enums.OptionStateEnum; import org.apache.streampark.console.core.enums.ReleaseStateEnum; import org.apache.streampark.console.core.mapper.FlinkApplicationMapper; -import org.apache.streampark.console.core.service.EffectiveService; import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.FlinkEffectiveService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.ProjectService; import org.apache.streampark.console.core.service.ResourceService; @@ -126,7 +126,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl +public class FlinkEffectiveServiceImpl extends ServiceImpl implements - EffectiveService { + FlinkEffectiveService { @Override public void remove(Long appId, EffectiveTypeEnum effectiveTypeEnum) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(Effective::getAppId, appId) - .eq(Effective::getTargetType, effectiveTypeEnum.getType()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(FlinkEffective::getAppId, appId) + .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType()); baseMapper.delete(queryWrapper); } @Override - public Effective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(Effective::getAppId, appId) - .eq(Effective::getTargetType, effectiveTypeEnum.getType()); + public FlinkEffective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(FlinkEffective::getAppId, appId) + .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType()); return this.getOne(queryWrapper); } @Override public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(Effective::getAppId, appId) - .eq(Effective::getTargetType, type.getType()); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(FlinkEffective::getAppId, appId) + .eq(FlinkEffective::getTargetType, type.getType()); long count = count(queryWrapper); if (count == 0) { - Effective effective = new Effective(); + FlinkEffective effective = new FlinkEffective(); effective.setAppId(appId); effective.setTargetType(type.getType()); effective.setTargetId(id); @@ -70,16 +70,17 @@ public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) { save(effective); } else { update( - new LambdaUpdateWrapper() - .eq(Effective::getAppId, appId) - .eq(Effective::getTargetType, type.getType()) - .set(Effective::getTargetId, id)); + new LambdaUpdateWrapper() + .eq(FlinkEffective::getAppId, appId) + .eq(FlinkEffective::getTargetType, type.getType()) + .set(FlinkEffective::getTargetId, id)); } } @Override public void removeByAppId(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Effective::getAppId, appId); + LambdaQueryWrapper queryWrapper = + new LambdaQueryWrapper().eq(FlinkEffective::getAppId, appId); this.remove(queryWrapper); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index faf8c84627..45048e5e26 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -28,7 +28,7 @@ import org.apache.streampark.console.core.enums.CandidateTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; import org.apache.streampark.console.core.mapper.FlinkSqlMapper; -import org.apache.streampark.console.core.service.EffectiveService; +import org.apache.streampark.console.core.service.FlinkEffectiveService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService; @@ -58,7 +58,7 @@ public class FlinkSqlServiceImpl extends ServiceImpl FlinkSqlService { @Autowired - private EffectiveService effectiveService; + private FlinkEffectiveService effectiveService; @Autowired private FlinkApplicationBackUpService backUpService; diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml index 59a6ac14ab..2dae562b2d 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml @@ -16,6 +16,6 @@ ~ limitations under the License. --> - + diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java similarity index 98% rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java index d199f4de90..5724ecee9f 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java @@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */ -class ApplicationManageServiceTest extends SpringUnitTestBase { +class FlinkApplicationManageServiceTest extends SpringUnitTestBase { @Autowired private FlinkApplicationManageService applicationManageService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java similarity index 98% rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java index 7a6129d52b..b6089adf93 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java @@ -32,7 +32,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** CatalogService Tests */ -public class CatalogServiceTest extends SpringUnitTestBase { +public class FlinkCatalogServiceTest extends SpringUnitTestBase { @Autowired private FlinkCatalogService catalogService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java similarity index 97% rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java index 1f29b10a45..66f43f9401 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java @@ -22,9 +22,9 @@ import org.apache.streampark.common.enums.FlinkJobType; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.console.SpringUnitTestBase; -import org.apache.streampark.console.core.entity.Effective; import org.apache.streampark.console.core.entity.FlinkApplication; import org.apache.streampark.console.core.entity.FlinkApplicationConfig; +import org.apache.streampark.console.core.entity.FlinkEffective; import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; import org.apache.streampark.console.core.enums.EffectiveTypeEnum; @@ -48,7 +48,7 @@ * FlinkSavepointServiceImpl} of {@link * SavepointService}. */ -class SavepointServiceTest extends SpringUnitTestBase { +class FlinkSavepointServiceTest extends SpringUnitTestBase { @Autowired private SavepointService savepointService; @@ -57,7 +57,7 @@ class SavepointServiceTest extends SpringUnitTestBase { private FlinkApplicationConfigService configService; @Autowired - private EffectiveService effectiveService; + private FlinkEffectiveService effectiveService; @Autowired private FlinkEnvService flinkEnvService; @@ -138,7 +138,7 @@ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() { + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(), "3min"))); configService.updateById(appCfg); - Effective effective = new Effective(); + FlinkEffective effective = new FlinkEffective(); effective.setTargetId(appCfg.getId()); effective.setAppId(appId); effective.setTargetType(EffectiveTypeEnum.CONFIG.getType());