Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
HxpSerein committed Oct 30, 2024
1 parent 5ac640a commit 5d7e0d9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

package org.apache.streampark.console;

import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.config.SpringProperties;
import org.apache.streampark.console.core.service.RegistryService;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;

import javax.annotation.PostConstruct;

/**
*
*
Expand All @@ -53,29 +48,10 @@
@EnableScheduling
public class StreamParkConsoleBootstrap {

@Autowired
private RegistryService registryService;

public static void main(String[] args) throws Exception {
new SpringApplicationBuilder()
.properties(SpringProperties.get())
.sources(StreamParkConsoleBootstrap.class)
.run(args);
}

@PostConstruct
public void init() {
if (enableHA()) {
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.unRegister();
log.info("RegistryService close success.");
}));
}
}

public boolean enableHA() {
return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.service.RegistryService;
import org.apache.streampark.console.core.service.SettingService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -81,6 +83,9 @@ public void run(ApplicationArguments args) throws Exception {
// init InternalConfig
initConfig();

// init RegistryService
initRegistryService();

boolean isTest = Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test");
if (!isTest) {
// initialize local file system resources
Expand Down Expand Up @@ -110,6 +115,18 @@ private void initConfig() {
overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
}

private void initRegistryService() {
boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
if (enable) {
RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.unRegister();
log.info("RegistryService unRegister success");
}));
}
}

private void overrideSystemProp(String key, String defaultValue) {
String value = context.getEnvironment().getProperty(key, defaultValue);
log.info("initialize system properties: key:{}, value:{}", key, value);
Expand Down

0 comments on commit 5d7e0d9

Please sign in to comment.