diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index 3d04bfb008..ad1e279e87 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -17,18 +17,13 @@ package org.apache.streampark.console; -import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.base.config.SpringProperties; -import org.apache.streampark.console.core.service.RegistryService; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.scheduling.annotation.EnableScheduling; -import javax.annotation.PostConstruct; - /** * * @@ -53,29 +48,10 @@ @EnableScheduling public class StreamParkConsoleBootstrap { - @Autowired - private RegistryService registryService; - public static void main(String[] args) throws Exception { new SpringApplicationBuilder() .properties(SpringProperties.get()) .sources(StreamParkConsoleBootstrap.class) .run(args); } - - @PostConstruct - public void init() { - if (enableHA()) { - registryService.registry(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - registryService.unRegister(); - log.info("RegistryService close success."); - })); - } - } - - public boolean enableHA() { - return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); - } - } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 4e5ba38ac3..ebcaef12fb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -26,9 +26,11 @@ import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; +import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.SparkEnv; +import org.apache.streampark.console.core.service.RegistryService; import org.apache.streampark.console.core.service.SettingService; import org.apache.commons.lang3.StringUtils; @@ -81,6 +83,9 @@ public void run(ApplicationArguments args) throws Exception { // init InternalConfig initConfig(); + // init RegistryService + initRegistryService(); + boolean isTest = Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test"); if (!isTest) { // initialize local file system resources @@ -110,6 +115,18 @@ private void initConfig() { overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName); } + private void initRegistryService() { + boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); + if (enable) { + RegistryService registryService = SpringContextUtils.getBean(RegistryService.class); + registryService.registry(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + registryService.unRegister(); + log.info("RegistryService unRegister success"); + })); + } + } + private void overrideSystemProp(String key, String defaultValue) { String value = context.getEnvironment().getProperty(key, defaultValue); log.info("initialize system properties: key:{}, value:{}", key, value);