getShipFiles() {
+ return shipFiles;
+ }
+ public YarnClient getYarnClient() {
+ return yarnClient;
+ }
+ /**
+ * The class to start the application master with. This class runs the main method in case of
+ * session cluster.
+ */
+ protected String getYarnSessionClusterEntrypoint() {
+ return YarnSessionClusterEntrypoint.class.getName();
+ }
+ /**
+ * The class to start the application master with. This class runs the main method in case of
+ * the job cluster.
+ */
+ protected String getYarnJobClusterEntrypoint() {
+ return YarnJobClusterEntrypoint.class.getName();
+ }
+ public Configuration getFlinkConfiguration() {
+ return flinkConfiguration;
+ }
+ public void setLocalJarPath(Path localJarPath) {
+ if (!localJarPath.toString().endsWith("jar")) {
+ throw new IllegalArgumentException(
+ "The passed jar path ('"
+ + localJarPath
+ + "') does not end with the 'jar' extension");
+ }
+ this.flinkJarPath = localJarPath;
+ }
+ /**
+ * Adds the given files to the list of files to ship.
+ *
+ * Note that any file matching "flink-dist*.jar" will be excluded from the upload by
+ * {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String,
+ * LocalResourceType)} since we upload the Flink uber jar ourselves and do not need to deploy it
+ * multiple times.
+ *
+ * @param shipFiles files to ship
+ */
+ public void addShipFiles(List shipFiles) {
+ checkArgument(
+ !isUsrLibDirIncludedInShipFiles(shipFiles),
+ "User-shipped directories configured via : %s should not include %s.",
+ YarnConfigOptions.SHIP_FILES.key(),
+ this.shipFiles.addAll(shipFiles);
+ }
+ private void addShipArchives(List shipArchives) {
+ checkArgument(
+ isArchiveOnlyIncludedInShipArchiveFiles(shipArchives),
+ "Non-archive files are included.");
+ this.shipArchives.addAll(shipArchives);
+ }
+ private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List shipFiles) {
+ return
+ .filter(File::isFile)
+ .map(File::getName)
+ .map(String::toLowerCase)
+ .allMatch(
+ name ->
+ name.endsWith(".tar.gz")
+ || name.endsWith(".tar")
+ || name.endsWith(".tgz")
+ || name.endsWith(".dst")
+ || name.endsWith(".jar")
+ || name.endsWith(".zip"));
+ }
+ private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
+ if (this.flinkJarPath == null) {
+ throw new YarnDeploymentException("The Flink jar path is null");
+ }
+ if (this.flinkConfiguration == null) {
+ throw new YarnDeploymentException("Flink configuration object has not been set");
+ }
+ // Check if we don't exceed YARN's maximum virtual cores.
+ final int numYarnMaxVcores = yarnClusterInformationRetriever.getMaxVcores();
+ int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
+ if (configuredAmVcores > numYarnMaxVcores) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The number of requested virtual cores for application master %d"
+ + " exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
+ configuredAmVcores, numYarnMaxVcores));
+ }
+ int configuredVcores =
+ flinkConfiguration.getInteger(
+ YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
+ // don't configure more than the maximum configured number of vcores
+ if (configuredVcores > numYarnMaxVcores) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The number of requested virtual cores per node %d"
+ + " exceeds the maximum number of virtual cores %d available in the Yarn Cluster."
+ + " Please note that the number of virtual cores is set to the number of task slots by default"
+ + " unless configured in the Flink config with '%s.'",
+ configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
+ }
+ // check if required Hadoop environment variables are set. If not, warn user
+ if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
+ LOG.warn(
+ "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
+ + "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ + "configuration for accessing YARN.");
+ }
+ }
+ public String getNodeLabel() {
+ return nodeLabel;
+ }
+ // -------------------------------------------------------------
+ // Lifecycle management
+ // -------------------------------------------------------------
+ @Override
+ public void close() {
+ if (!sharedYarnClient) {
+ yarnClient.stop();
+ }
+ }
+ // -------------------------------------------------------------
+ // ClusterClient overrides
+ // -------------------------------------------------------------
+ @Override
+ public ClusterClientProvider retrieve(ApplicationId applicationId)
+ throws ClusterRetrieveException {
+ try {
+ // check if required Hadoop environment variables are set. If not, warn user
+ if (System.getenv("HADOOP_CONF_DIR") == null
+ && System.getenv("YARN_CONF_DIR") == null) {
+ LOG.warn(
+ "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set."
+ + "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ + "configuration for accessing YARN.");
+ }
+ final ApplicationReport report = yarnClient.getApplicationReport(applicationId);
+ if (report.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
+ // Flink cluster is not running anymore
+ LOG.error(
+ "The application {} doesn't run anymore. It has previously completed with final status: {}",
+ applicationId,
+ report.getFinalApplicationStatus());
+ throw new RuntimeException(
+ "The Yarn application " + applicationId + " doesn't run anymore.");
+ }
+ setClusterEntrypointInfoToConfig(report);
+ return () -> {
+ try {
+ return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
+ }
+ };
+ } catch (Exception e) {
+ throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e);
+ }
+ }
+ @Override
+ public ClusterClientProvider deploySessionCluster(
+ ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink session cluster",
+ getYarnSessionClusterEntrypoint(),
+ null,
+ false);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
+ }
+ }
+ @Override
+ public ClusterClientProvider deployApplicationCluster(
+ final ClusterSpecification clusterSpecification,
+ final ApplicationConfiguration applicationConfiguration)
+ throws ClusterDeploymentException {
+ checkNotNull(clusterSpecification);
+ checkNotNull(applicationConfiguration);
+ final YarnDeploymentTarget deploymentTarget =
+ YarnDeploymentTarget.fromConfig(flinkConfiguration);
+ if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
+ throw new ClusterDeploymentException(
+ "Couldn't deploy Yarn Application Cluster."
+ + " Expected"
+ + YarnDeploymentTarget.APPLICATION.getName()
+ + " but actual one was \""
+ + deploymentTarget.getName()
+ + "\"");
+ }
+ applicationConfiguration.applyToConfiguration(flinkConfiguration);
+ // No need to do pipelineJars validation if it is a PyFlink job.
+ if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
+ || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
+ final List pipelineJars =
+ flinkConfiguration
+ .getOptional(PipelineOptions.JARS)
+ .orElse(Collections.emptyList());
+ Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
+ }
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink Application Cluster",
+ YarnApplicationClusterEntryPoint.class.getName(),
+ null,
+ false);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
+ }
+ }
+ @Override
+ public ClusterClientProvider deployJobCluster(
+ ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
+ throws ClusterDeploymentException {
+ LOG.warn(
+ "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink per-job cluster",
+ getYarnJobClusterEntrypoint(),
+ jobGraph,
+ detached);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
+ }
+ }
+ @Override
+ public void killCluster(ApplicationId applicationId) throws FlinkException {
+ try {
+ yarnClient.killApplication(applicationId);
+ try (final FileSystem fs = FileSystem.get(yarnConfiguration)) {
+ final Path applicationDir =
+ YarnApplicationFileUploader.getApplicationDirPath(
+ getStagingDir(fs), applicationId);
+ Utils.deleteApplicationFiles(applicationDir.toUri().toString());
+ }
+ } catch (YarnException | IOException e) {
+ throw new FlinkException(
+ "Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
+ }
+ }
+ /**
+ * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
+ *
+ * @param clusterSpecification Initial cluster specification for the Flink cluster to be
+ * deployed
+ * @param applicationName name of the Yarn application to start
+ * @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
+ * @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
+ * @param detached True if the cluster should be started in detached mode
+ */
+ private ClusterClientProvider deployInternal(
+ ClusterSpecification clusterSpecification,
+ String applicationName,
+ String yarnClusterEntrypoint,
+ @Nullable JobGraph jobGraph,
+ boolean detached)
+ throws Exception {
+ final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
+ boolean useTicketCache =
+ flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+ if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
+ throw new RuntimeException(
+ "Hadoop security with Kerberos is enabled but the login user "
+ + "does not have Kerberos credentials or delegation tokens!");
+ }
+ final boolean fetchToken =
+ flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+ final boolean yarnAccessFSEnabled =
+ !CollectionUtil.isNullOrEmpty(
+ flinkConfiguration.get(
+ if (!fetchToken && yarnAccessFSEnabled) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "When %s is disabled, %s must be disabled as well.",
+ }
+ }
+ isReadyForDeployment(clusterSpecification);
+ // ------------------ Check if the specified queue exists --------------------
+ checkYarnQueues(yarnClient);
+ // ------------------ Check if the YARN ClusterClient has the requested resources
+ // --------------
+ // Create application via yarnClient
+ final YarnClientApplication yarnApplication = yarnClient.createApplication();
+ final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+ Resource maxRes = appResponse.getMaximumResourceCapability();
+ final ClusterResourceDescription freeClusterMem;
+ try {
+ freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ } catch (YarnException | IOException e) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw new YarnDeploymentException(
+ "Could not retrieve information about free cluster resources.", e);
+ }
+ final int yarnMinAllocationMB =
+ yarnConfiguration.getInt(
+ if (yarnMinAllocationMB <= 0) {
+ throw new YarnDeploymentException(
+ "The minimum allocation memory "
+ + "("
+ + yarnMinAllocationMB
+ + " MB) configured via '"
+ + "' should be greater than 0.");
+ }
+ final ClusterSpecification validClusterSpecification;
+ try {
+ validClusterSpecification =
+ validateClusterResources(
+ clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
+ } catch (YarnDeploymentException yde) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw yde;
+ }
+"Cluster specification: {}", validClusterSpecification);
+ final ClusterEntrypoint.ExecutionMode executionMode =
+ detached
+ ? ClusterEntrypoint.ExecutionMode.DETACHED
+ : ClusterEntrypoint.ExecutionMode.NORMAL;
+ flinkConfiguration.setString(
+ ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
+ ApplicationReport report =
+ startAppMaster(
+ flinkConfiguration,
+ applicationName,
+ yarnClusterEntrypoint,
+ jobGraph,
+ yarnClient,
+ yarnApplication,
+ validClusterSpecification);
+ // print the application id for user to cancel themselves.
+ if (detached) {
+ final ApplicationId yarnApplicationId = report.getApplicationId();
+ logDetachedClusterInformation(yarnApplicationId, LOG);
+ }
+ setClusterEntrypointInfoToConfig(report);
+ return () -> {
+ try {
+ return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
+ } catch (Exception e) {
+ throw new RuntimeException("Error while creating RestClusterClient.", e);
+ }
+ };
+ }
+ private ClusterSpecification validateClusterResources(
+ ClusterSpecification clusterSpecification,
+ int yarnMinAllocationMB,
+ Resource maximumResourceCapability,
+ ClusterResourceDescription freeClusterResources)
+ throws YarnDeploymentException {
+ int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
+ final int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
+ logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
+ "JobManager", jobManagerMemoryMb, yarnMinAllocationMB);
+ logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
+ "TaskManager", taskManagerMemoryMb, yarnMinAllocationMB);
+ // set the memory to minAllocationMB to do the next checks correctly
+ if (jobManagerMemoryMb < yarnMinAllocationMB) {
+ jobManagerMemoryMb = yarnMinAllocationMB;
+ }
+ final String note =
+ "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+ if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
+ throw new YarnDeploymentException(
+ "The cluster does not have the requested resources for the JobManager available!\n"
+ + "Maximum Memory: "
+ + maximumResourceCapability.getMemory()
+ + "MB Requested: "
+ + jobManagerMemoryMb
+ + "MB. "
+ + note);
+ }
+ if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
+ throw new YarnDeploymentException(
+ "The cluster does not have the requested resources for the TaskManagers available!\n"
+ + "Maximum Memory: "
+ + maximumResourceCapability.getMemory()
+ + " Requested: "
+ + taskManagerMemoryMb
+ + "MB. "
+ + note);
+ }
+ final String noteRsc =
+ "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are "
+ + "connecting from the beginning because the resources are currently not available in the cluster. "
+ + "The allocation might take more time than usual because the Flink YARN client needs to wait until "
+ + "the resources become available.";
+ if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
+ LOG.warn(
+ "The requested amount of memory for the TaskManagers ("
+ + taskManagerMemoryMb
+ + "MB) is more than "
+ + "the largest possible YARN container: "
+ + freeClusterResources.containerLimit
+ + noteRsc);
+ }
+ if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
+ LOG.warn(
+ "The requested amount of memory for the JobManager ("
+ + jobManagerMemoryMb
+ + "MB) is more than "
+ + "the largest possible YARN container: "
+ + freeClusterResources.containerLimit
+ + noteRsc);
+ }
+ return new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(jobManagerMemoryMb)
+ .setTaskManagerMemoryMB(taskManagerMemoryMb)
+ .setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
+ .createClusterSpecification();
+ }
+ private void logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
+ String componentName, int componentMemoryMB, int yarnMinAllocationMB) {
+ int normalizedMemMB =
+ (componentMemoryMB + (yarnMinAllocationMB - 1))
+ / yarnMinAllocationMB
+ * yarnMinAllocationMB;
+ if (normalizedMemMB <= 0) {
+ normalizedMemMB = yarnMinAllocationMB;
+ }
+ if (componentMemoryMB != normalizedMemMB) {
+ "The configured {} memory is {} MB. YARN will allocate {} MB to make up an integer multiple of its "
+ + "minimum allocation memory ({} MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra {} MB "
+ + "may not be used by Flink.",
+ componentName,
+ componentMemoryMB,
+ normalizedMemMB,
+ yarnMinAllocationMB,
+ normalizedMemMB - componentMemoryMB);
+ }
+ }
+ private void checkYarnQueues(YarnClient yarnClient) {
+ try {
+ List queues = yarnClient.getAllQueues();
+ if (queues.size() > 0
+ && this.yarnQueue
+ != null) { // check only if there are queues configured in yarn and for
+ // this session.
+ boolean queueFound = false;
+ for (QueueInfo queue : queues) {
+ if (queue.getQueueName().equals(this.yarnQueue)
+ || queue.getQueueName().equals("root." + this.yarnQueue)) {
+ queueFound = true;
+ break;
+ }
+ }
+ if (!queueFound) {
+ String queueNames = StringUtils.toQuotedListString(queues.toArray());
+ LOG.warn(
+ "The specified queue '"
+ + this.yarnQueue
+ + "' does not exist. "
+ + "Available queues: "
+ + queueNames);
+ }
+ } else {
+ LOG.debug("The YARN cluster does not have any queues configured");
+ }
+ } catch (Throwable e) {
+ LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error details", e);
+ }
+ }
+ }
+ /** 只修改 方法 从 private --> public 逻辑变更放到扩展类里面 */
+ public ApplicationReport startAppMaster(
+ Configuration configuration,
+ String applicationName,
+ String yarnClusterEntrypoint,
+ JobGraph jobGraph,
+ YarnClient yarnClient,
+ YarnClientApplication yarnApplication,
+ ClusterSpecification clusterSpecification)
+ throws Exception {
+ // ------------------ Initialize the file systems -------------------------
+ org.apache.flink.core.fs.FileSystem.initialize(
+ configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
+ final FileSystem fs = FileSystem.get(yarnConfiguration);
+ // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
+ // method.
+ if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
+ && fs.getScheme().startsWith("file")) {
+ LOG.warn(
+ "The file system scheme is '"
+ + fs.getScheme()
+ + "'. This indicates that the "
+ + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ + "The Flink YARN client needs to store its files in a distributed file system");
+ }
+ ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
+ final List providedLibDirs =
+ Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);
+ final Optional providedUsrLibDir =
+ Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);
+ Path stagingDirPath = getStagingDir(fs);
+ FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
+ final YarnApplicationFileUploader fileUploader =
+ YarnApplicationFileUploader.from(
+ stagingDirFs,
+ stagingDirPath,
+ providedLibDirs,
+ appContext.getApplicationId(),
+ getFileReplication());
+ String cjDistPathStr = configuration.get(ChunJunServerOptions.CHUNJUN_DIST_PATH);
+ Path cjDistPath = new Path(cjDistPathStr);
+ Collection cjDistPaths = getCJUploadPath(cjDistPath);
+ List cjDistUploadPath =
+ fileUploader.registerMultipleLocalResources(
+ cjDistPaths, Path.CUR_DIR, LocalResourceType.FILE);
+ // The files need to be shipped and added to classpath.
+ Set systemShipFiles = new HashSet<>(shipFiles.size());
+ for (File file : shipFiles) {
+ systemShipFiles.add(file.getAbsoluteFile());
+ }
+ // Set-up ApplicationSubmissionContext for the application
+ final ApplicationId appId = appContext.getApplicationId();
+ // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
+ setHAClusterIdIfNotSet(configuration, appId);
+ if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
+ // activate re-execution of failed applications
+ appContext.setMaxAppAttempts(
+ configuration.getInteger(
+ YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+ activateHighAvailabilitySupport(appContext);
+ } else {
+ // set number of application retries to 1 in the default case
+ appContext.setMaxAppAttempts(
+ configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
+ }
+ final Set userJarFiles = new HashSet<>();
+ if (jobGraph != null) {
+ userJarFiles.addAll(
+ jobGraph.getUserJars().stream()
+ .map(f -> f.toUri())
+ .map(Path::new)
+ .collect(Collectors.toSet()));
+ }
+ final List jarUrls =
+ ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
+ if (jarUrls != null
+ && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
+ userJarFiles.addAll(;
+ }
+ // only for per job mode
+ if (jobGraph != null) {
+ for (Map.Entry entry :
+ jobGraph.getUserArtifacts().entrySet()) {
+ // only upload local files
+ if (!Utils.isRemotePath(entry.getValue().filePath)) {
+ Path localPath = new Path(entry.getValue().filePath);
+ Tuple2 remoteFileInfo =
+ fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
+ jobGraph.setUserArtifactRemotePath(
+ entry.getKey(), remoteFileInfo.f0.toString());
+ }
+ }
+ jobGraph.writeUserArtifactEntriesToConfiguration();
+ }
+ if (providedLibDirs == null || providedLibDirs.isEmpty()) {
+ addLibFoldersToShipFiles(systemShipFiles);
+ }
+ // Register all files in provided lib dirs as local resources with public visibility
+ // and upload the remaining dependencies as local resources with APPLICATION visibility.
+ final List systemClassPaths = fileUploader.registerProvidedLocalResources();
+ String logConfigFilePath =
+ configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
+ if (logConfigFilePath != null) {
+ logConfigFilePath = logConfigFilePath + File.separator + "";
+ File logFile = new File(logConfigFilePath);
+ systemShipFiles.add(logFile);
+ String relatePath = "." + File.separator + logFile.getName();
+ systemClassPaths.add(relatePath);
+ configuration.setString(
+ YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE.key(), relatePath);
+ }
+ final List uploadedDependencies =
+ fileUploader.registerMultipleLocalResources(
+ .map(e -> new Path(e.toURI()))
+ .collect(Collectors.toSet()),
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
+ systemClassPaths.addAll(uploadedDependencies);
+ // upload and register ship-only files
+ // Plugin files only need to be shipped and should not be added to classpath.
+ if (providedLibDirs == null || providedLibDirs.isEmpty()) {
+ Set shipOnlyFiles = new HashSet<>();
+ addPluginsFoldersToShipFiles(shipOnlyFiles);
+ fileUploader.registerMultipleLocalResources(
+ .map(e -> new Path(e.toURI()))
+ .collect(Collectors.toSet()),
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
+ }
+ if (!shipArchives.isEmpty()) {
+ fileUploader.registerMultipleLocalResources(
+ -> new Path(e.toURI())).collect(Collectors.toSet()),
+ Path.CUR_DIR,
+ LocalResourceType.ARCHIVE);
+ }
+ // only for application mode
+ // Python jar file only needs to be shipped and should not be added to classpath.
+ if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
+ && PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
+ fileUploader.registerMultipleLocalResources(
+ Collections.singletonList(
+ new Path(PackagedProgramUtils.getPythonJar().toURI())),
+ ConfigConstants.DEFAULT_FLINK_OPT_DIR,
+ LocalResourceType.FILE);
+ }
+ // Upload and register user jars
+ final List userClassPaths =
+ fileUploader.registerMultipleLocalResources(
+ userJarFiles,
+ userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
+ : Path.CUR_DIR,
+ LocalResourceType.FILE);
+ // add cjlib to classpath
+ if (cjDistUploadPath != null) {
+ userClassPaths.addAll(cjDistUploadPath);
+ }
+ // usrlib in remote will be used first.
+ if (providedUsrLibDir.isPresent()) {
+ final List usrLibClassPaths =
+ fileUploader.registerMultipleLocalResources(
+ Collections.singletonList(providedUsrLibDir.get()),
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
+ userClassPaths.addAll(usrLibClassPaths);
+ } else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
+ // local usrlib will be automatically shipped if it exists and there is no remote
+ // usrlib.
+ final Set usrLibShipFiles = new HashSet<>();
+ addUsrLibFolderToShipFiles(usrLibShipFiles);
+ final List usrLibClassPaths =
+ fileUploader.registerMultipleLocalResources(
+ .map(e -> new Path(e.toURI()))
+ .collect(Collectors.toSet()),
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
+ userClassPaths.addAll(usrLibClassPaths);
+ }
+ if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
+ systemClassPaths.addAll(userClassPaths);
+ }
+ // normalize classpath by sorting
+ Collections.sort(systemClassPaths);
+ Collections.sort(userClassPaths);
+ // classpath assembler
+ StringBuilder classPathBuilder = new StringBuilder();
+ if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
+ for (String userClassPath : userClassPaths) {
+ classPathBuilder.append(userClassPath).append(File.pathSeparator);
+ }
+ }
+ for (String classPath : systemClassPaths) {
+ classPathBuilder.append(classPath).append(File.pathSeparator);
+ }
+ // Setup jar for ApplicationMaster
+ final YarnLocalResourceDescriptor localResourceDescFlinkJar =
+ fileUploader.uploadFlinkDist(flinkJarPath);
+ classPathBuilder
+ .append(localResourceDescFlinkJar.getResourceKey())
+ .append(File.pathSeparator);
+ // write job graph to tmp file and add it to local resource
+ // TODO: server use user main method to generate job graph
+ if (jobGraph != null) {
+ File tmpJobGraphFile = null;
+ try {
+ tmpJobGraphFile = File.createTempFile(appId.toString(), null);
+ try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
+ ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
+ obOutput.writeObject(jobGraph);
+ }
+ final String jobGraphFilename = "job.graph";
+ configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
+ fileUploader.registerSingleLocalResource(
+ jobGraphFilename,
+ new Path(tmpJobGraphFile.toURI()),
+ "",
+ LocalResourceType.FILE,
+ true,
+ false);
+ classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
+ } catch (Exception e) {
+ LOG.warn("Add job graph to local resource fail.");
+ throw e;
+ } finally {
+ if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
+ LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
+ }
+ }
+ }
+ // Upload the flink configuration
+ // write out configuration file
+ File tmpConfigurationFile = null;
+ try {
+ tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
+ // remove localhost bind hosts as they render production clusters unusable
+ removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
+ removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
+ // this setting is unconditionally overridden anyway, so we remove it for clarity
+ configuration.removeConfig(TaskManagerOptions.HOST);
+ BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
+ String flinkConfigKey = "flink-conf.yaml";
+ fileUploader.registerSingleLocalResource(
+ flinkConfigKey,
+ new Path(tmpConfigurationFile.getAbsolutePath()),
+ "",
+ LocalResourceType.FILE,
+ true,
+ true);
+ classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
+ } finally {
+ if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
+ LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
+ }
+ }
+ if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
+ for (String userClassPath : userClassPaths) {
+ classPathBuilder.append(userClassPath).append(File.pathSeparator);
+ }
+ }
+ // To support Yarn Secure Integration Test Scenario
+ // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
+ // the Yarn site XML
+ // and KRB5 configuration files. We are adding these files as container local resources for
+ // the container
+ // applications (JM/TMs) to have proper secure cluster setup
+ Path remoteYarnSiteXmlPath = null;
+ if (System.getenv("IN_TESTS") != null) {
+ File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
+ "Adding Yarn configuration {} to the AM container local resource bucket",
+ f.getAbsolutePath());
+ Path yarnSitePath = new Path(f.getAbsolutePath());
+ remoteYarnSiteXmlPath =
+ fileUploader
+ .registerSingleLocalResource(
+ yarnSitePath,
+ "",
+ LocalResourceType.FILE,
+ false,
+ false)
+ .getPath();
+ if (System.getProperty("") != null) {
+ configuration.set(
+ SecurityOptions.KERBEROS_KRB5_PATH,
+ System.getProperty(""));
+ }
+ }
+ Path remoteKrb5Path = null;
+ boolean hasKrb5 = false;
+ String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
+ if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
+ final File krb5 = new File(krb5Config);
+ "Adding KRB5 configuration {} to the AM container local resource bucket",
+ krb5.getAbsolutePath());
+ final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
+ remoteKrb5Path =
+ fileUploader
+ .registerSingleLocalResource(
+ krb5ConfPath,
+ "",
+ LocalResourceType.FILE,
+ false,
+ false)
+ .getPath();
+ hasKrb5 = true;
+ }
+ Path remotePathKeytab = null;
+ String localizedKeytabPath = null;
+ String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+ if (keytab != null) {
+ boolean localizeKeytab =
+ flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
+ localizedKeytabPath =
+ flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
+ if (localizeKeytab) {
+ // Localize the keytab to YARN containers via local resource.
+"Adding keytab {} to the AM container local resource bucket", keytab);
+ remotePathKeytab =
+ fileUploader
+ .registerSingleLocalResource(
+ localizedKeytabPath,
+ new Path(keytab),
+ "",
+ LocalResourceType.FILE,
+ false,
+ false)
+ .getPath();
+ } else {
+ // // Assume Keytab is pre-installed in the container.
+ localizedKeytabPath =
+ flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
+ }
+ }
+ final JobManagerProcessSpec processSpec =
+ JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
+ flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
+ final ContainerLaunchContext amContainer =
+ setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
+ // New delegation token framework
+ if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+ setTokensFor(amContainer);
+ }
+ // Old delegation token framework
+ if (UserGroupInformation.isSecurityEnabled()) {
+"Adding delegation token to the AM container.");
+ final List pathsToObtainToken = new ArrayList<>();
+ boolean fetchToken =
+ configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+ if (fetchToken) {
+ List yarnAccessList =
+ ConfigUtils.decodeListFromConfig(
+ configuration,
+ Path::new);
+ pathsToObtainToken.addAll(yarnAccessList);
+ pathsToObtainToken.addAll(fileUploader.getRemotePaths());
+ }
+ Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
+ }
+ amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
+ fileUploader.close();
+ // Setup CLASSPATH and environment variables for ApplicationMaster
+ final Map appMasterEnv =
+ generateApplicationMasterEnv(
+ fileUploader,
+ classPathBuilder.toString(),
+ localResourceDescFlinkJar.toString(),
+ appId.toString());
+ if (localizedKeytabPath != null) {
+ appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
+ String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+ appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
+ if (remotePathKeytab != null) {
+ appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
+ }
+ }
+ // To support Yarn Secure Integration Test Scenario
+ if (remoteYarnSiteXmlPath != null) {
+ appMasterEnv.put(
+ YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+ }
+ if (remoteKrb5Path != null) {
+ appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
+ }
+ amContainer.setEnvironment(appMasterEnv);
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(clusterSpecification.getMasterMemoryMB());
+ capability.setVirtualCores(
+ flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
+ final String customApplicationName = customName != null ? customName : applicationName;
+ appContext.setApplicationName(customApplicationName);
+ appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ // Set priority for application
+ int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
+ if (priorityNum >= 0) {
+ Priority priority = Priority.newInstance(priorityNum);
+ appContext.setPriority(priority);
+ }
+ if (yarnQueue != null) {
+ appContext.setQueue(yarnQueue);
+ }
+ setApplicationNodeLabel(appContext);
+ setApplicationTags(appContext);
+ // add a hook to clean up in case deployment fails
+ Thread deploymentFailureHook =
+ new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
+ Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
+"Submitting application master " + appId);
+ yarnClient.submitApplication(appContext);
+"Waiting for the cluster to be allocated");
+ final long startTime = System.currentTimeMillis();
+ ApplicationReport report;
+ YarnApplicationState lastAppState = YarnApplicationState.NEW;
+ loop:
+ while (true) {
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ } catch (IOException e) {
+ throw new YarnDeploymentException("Failed to deploy the cluster.", e);
+ }
+ YarnApplicationState appState = report.getYarnApplicationState();
+ LOG.debug("Application State: {}", appState);
+ switch (appState) {
+ case FAILED:
+ case KILLED:
+ throw new YarnDeploymentException(
+ "The YARN application unexpectedly switched to state "
+ + appState
+ + " during deployment. \n"
+ + "Diagnostics from YARN: "
+ + report.getDiagnostics()
+ + "\n"
+ + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ + "yarn logs -applicationId "
+ + appId);
+ // break ..
+ case RUNNING:
+"YARN application has been deployed successfully.");
+ break loop;
+ case FINISHED:
+"YARN application has been finished successfully.");
+ break loop;
+ default:
+ if (appState != lastAppState) {
+"Deploying cluster, current state " + appState);
+ }
+ if (System.currentTimeMillis() - startTime > 60000) {
+ "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
+ }
+ }
+ lastAppState = appState;
+ Thread.sleep(250);
+ }
+ // since deployment was successful, remove the hook
+ ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
+ return report;
+ }
+ /**
+ * chunjun server 包含在chunjun 目录下,不需要上传
+ *
+ * @param path
+ * @return
+ */
+ public List getCJUploadPath(Path path) {
+ File cjDir = new File(path.toUri().getPath());
+ if (cjDir.exists() && cjDir.isDirectory()) {
+ List paths = new ArrayList<>();
+ .filter(
+ file ->
+ !(file.toURI().getPath().contains("/server/")
+ || file.toURI().getPath().contains("/logs/")
+ || file.toURI().getPath().contains("/conf/")
+ || file.toURI().getPath().contains("/run/")
+ || file.toURI().getPath().contains("/bin/")
+ || file.toURI().getPath().contains("nohup")))
+ .forEach(ele -> paths.add(new Path(ele.getPath())));
+ return paths;
+ } else {
+ throw new RuntimeException(
+ "chunjun server not found or not a dir path " + path.toUri().getPath());
+ }
+ }
+ private void removeLocalhostBindHostSetting(
+ Configuration configuration, ConfigOption> option) {
+ configuration
+ .getOptional(option)
+ .filter(bindHost -> bindHost.equals("localhost"))
+ .ifPresent(
+ bindHost -> {
+ "Removing 'localhost' {} setting from effective configuration; using '' instead.",
+ option);
+ configuration.removeConfig(option);
+ });
+ }
+ private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception {
+"Adding delegation tokens to the AM container.");
+ Credentials credentials = new Credentials();
+ DelegationTokenManager delegationTokenManager =
+ new KerberosDelegationTokenManager(flinkConfiguration, null, null);
+ delegationTokenManager.obtainDelegationTokens(credentials);
+ ByteBuffer tokens = ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
+ containerLaunchContext.setTokens(tokens);
+"Delegation tokens added to the AM container.");
+ }
+ /**
+ * Returns the configured remote target home directory if set, otherwise returns the default
+ * home directory.
+ *
+ * @param defaultFileSystem default file system used
+ * @return the remote target home directory
+ */
+ @VisibleForTesting
+ Path getStagingDir(FileSystem defaultFileSystem) throws IOException {
+ final String configuredStagingDir =
+ flinkConfiguration.getString(YarnConfigOptions.STAGING_DIRECTORY);
+ if (configuredStagingDir == null) {
+ return defaultFileSystem.getHomeDirectory();
+ }
+ FileSystem stagingDirFs =
+ new Path(configuredStagingDir).getFileSystem(defaultFileSystem.getConf());
+ return stagingDirFs.makeQualified(new Path(configuredStagingDir));
+ }
+ private int getFileReplication() {
+ final int yarnFileReplication =
+ yarnConfiguration.getInt(
+ final int fileReplication =
+ flinkConfiguration.getInteger(YarnConfigOptions.FILE_REPLICATION);
+ return fileReplication > 0 ? fileReplication : yarnFileReplication;
+ }
+ private static String encodeYarnLocalResourceDescriptorListToString(
+ List resources) {
+ return String.join(
+ .map(YarnLocalResourceDescriptor::toString)
+ .collect(Collectors.toList()));
+ }
+ /**
+ * Kills YARN application and stops YARN client.
+ *
+ * Use this method to kill the App before it has been properly deployed
+ */
+ private void failSessionDuringDeployment(
+ YarnClient yarnClient, YarnClientApplication yarnApplication) {
+"Killing YARN application");
+ try {
+ yarnClient.killApplication(
+ yarnApplication.getNewApplicationResponse().getApplicationId());
+ } catch (Exception e) {
+ // we only log a debug message here because the "killApplication" call is a best-effort
+ // call (we don't know if the application has been deployed when the error occurred).
+ LOG.debug("Error while killing YARN application", e);
+ }
+ }
+ private static class ClusterResourceDescription {
+ public final int totalFreeMemory;
+ public final int containerLimit;
+ public final int[] nodeManagersFree;
+ public ClusterResourceDescription(
+ int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
+ this.totalFreeMemory = totalFreeMemory;
+ this.containerLimit = containerLimit;
+ this.nodeManagersFree = nodeManagersFree;
+ }
+ }
+ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient)
+ throws YarnException, IOException {
+ List nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ int totalFreeMemory = 0;
+ int containerLimit = 0;
+ int[] nodeManagersFree = new int[nodes.size()];
+ for (int i = 0; i < nodes.size(); i++) {
+ NodeReport rep = nodes.get(i);
+ int free =
+ rep.getCapability().getMemory()
+ - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
+ nodeManagersFree[i] = free;
+ totalFreeMemory += free;
+ if (free > containerLimit) {
+ containerLimit = free;
+ }
+ }
+ return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
+ }
+ @Override
+ public String getClusterDescription() {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+ ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
+ List nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ final String format = "|%-16s |%-16s %n";
+ ps.printf("|Property |Value %n");
+ ps.println("+---------------------------------------+");
+ int totalMemory = 0;
+ int totalCores = 0;
+ for (NodeReport rep : nodes) {
+ final Resource res = rep.getCapability();
+ totalMemory += res.getMemory();
+ totalCores += res.getVirtualCores();
+ ps.format(format, "NodeID", rep.getNodeId());
+ ps.format(format, "Memory", res.getMemory() + " MB");
+ ps.format(format, "vCores", res.getVirtualCores());
+ ps.format(format, "HealthReport", rep.getHealthReport());
+ ps.format(format, "Containers", rep.getNumContainers());
+ ps.println("+---------------------------------------+");
+ }
+ ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
+ List qInfo = yarnClient.getAllQueues();
+ for (QueueInfo q : qInfo) {
+ ps.println(
+ "Queue: "
+ + q.getQueueName()
+ + ", Current Capacity: "
+ + q.getCurrentCapacity()
+ + " Max Capacity: "
+ + q.getMaximumCapacity()
+ + " Applications: "
+ + q.getApplications().size());
+ }
+ return baos.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't get cluster description", e);
+ }
+ }
+ private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext)
+ throws InvocationTargetException, IllegalAccessException {
+ ApplicationSubmissionContextReflector reflector =
+ ApplicationSubmissionContextReflector.getInstance();
+ reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
+ reflector.setAttemptFailuresValidityInterval(
+ appContext,
+ flinkConfiguration.getLong(
+ }
+ private void setApplicationTags(final ApplicationSubmissionContext appContext)
+ throws InvocationTargetException, IllegalAccessException {
+ final ApplicationSubmissionContextReflector reflector =
+ ApplicationSubmissionContextReflector.getInstance();
+ final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
+ final Set applicationTags = new HashSet<>();
+ // Trim whitespace and cull empty tags
+ for (final String tag : tagsString.split(",")) {
+ final String trimmedTag = tag.trim();
+ if (!trimmedTag.isEmpty()) {
+ applicationTags.add(trimmedTag);
+ }
+ }
+ reflector.setApplicationTags(appContext, applicationTags);
+ }
+ private void setApplicationNodeLabel(final ApplicationSubmissionContext appContext)
+ throws InvocationTargetException, IllegalAccessException {
+ if (nodeLabel != null) {
+ final ApplicationSubmissionContextReflector reflector =
+ ApplicationSubmissionContextReflector.getInstance();
+ reflector.setApplicationNodeLabel(appContext, nodeLabel);
+ }
+ }
+ /**
+ * Singleton object which uses reflection to determine whether the {@link
+ * ApplicationSubmissionContext} supports various methods which, depending on the Hadoop
+ * version, may or may not be supported.
+ *
+ * If an unsupported method is invoked, nothing happens.
+ *
+ *
Currently three methods are proxied: - setApplicationTags (>= 2.4.0) -
+ * setAttemptFailuresValidityInterval (>= 2.6.0) - setKeepContainersAcrossApplicationAttempts
+ * (>= 2.4.0) - setNodeLabelExpression (>= 2.6.0)
+ */
+ private static class ApplicationSubmissionContextReflector {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
+ private static final ApplicationSubmissionContextReflector instance =
+ new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+ public static ApplicationSubmissionContextReflector getInstance() {
+ return instance;
+ }
+ private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
+ private static final String ATTEMPT_FAILURES_METHOD_NAME =
+ "setAttemptFailuresValidityInterval";
+ private static final String KEEP_CONTAINERS_METHOD_NAME =
+ "setKeepContainersAcrossApplicationAttempts";
+ private static final String NODE_LABEL_EXPRESSION_NAME = "setNodeLabelExpression";
+ private final Method applicationTagsMethod;
+ private final Method attemptFailuresValidityIntervalMethod;
+ private final Method keepContainersMethod;
+ @Nullable private final Method nodeLabelExpressionMethod;
+ private ApplicationSubmissionContextReflector(Class clazz) {
+ Method applicationTagsMethod;
+ Method attemptFailuresValidityIntervalMethod;
+ Method keepContainersMethod;
+ Method nodeLabelExpressionMethod;
+ try {
+ // this method is only supported by Hadoop 2.4.0 onwards
+ applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
+ LOG.debug(
+ "{} supports method {}.",
+ clazz.getCanonicalName(),
+ } catch (NoSuchMethodException e) {
+ LOG.debug(
+ "{} does not support method {}.",
+ clazz.getCanonicalName(),
+ // assign null because the Hadoop version apparently does not support this call.
+ applicationTagsMethod = null;
+ }
+ this.applicationTagsMethod = applicationTagsMethod;
+ try {
+ // this method is only supported by Hadoop 2.6.0 onwards
+ attemptFailuresValidityIntervalMethod =
+ clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
+ LOG.debug(
+ "{} supports method {}.",
+ clazz.getCanonicalName(),
+ } catch (NoSuchMethodException e) {
+ LOG.debug(
+ "{} does not support method {}.",
+ clazz.getCanonicalName(),
+ // assign null because the Hadoop version apparently does not support this call.
+ attemptFailuresValidityIntervalMethod = null;
+ }
+ this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
+ try {
+ // this method is only supported by Hadoop 2.4.0 onwards
+ keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
+ LOG.debug(
+ "{} supports method {}.",
+ clazz.getCanonicalName(),
+ } catch (NoSuchMethodException e) {
+ LOG.debug(
+ "{} does not support method {}.",
+ clazz.getCanonicalName(),
+ // assign null because the Hadoop version apparently does not support this call.
+ keepContainersMethod = null;
+ }
+ this.keepContainersMethod = keepContainersMethod;
+ try {
+ nodeLabelExpressionMethod =
+ clazz.getMethod(NODE_LABEL_EXPRESSION_NAME, String.class);
+ LOG.debug(
+ "{} supports method {}.",
+ clazz.getCanonicalName(),
+ } catch (NoSuchMethodException e) {
+ LOG.debug(
+ "{} does not support method {}.",
+ clazz.getCanonicalName(),
+ nodeLabelExpressionMethod = null;
+ }
+ this.nodeLabelExpressionMethod = nodeLabelExpressionMethod;
+ }
+ public void setApplicationTags(
+ ApplicationSubmissionContext appContext, Set applicationTags)
+ throws InvocationTargetException, IllegalAccessException {
+ if (applicationTagsMethod != null) {
+ LOG.debug(
+ "Calling method {} of {}.",
+ applicationTagsMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ applicationTagsMethod.invoke(appContext, applicationTags);
+ } else {
+ LOG.debug(
+ "{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(),
+ }
+ }
+ public void setApplicationNodeLabel(
+ ApplicationSubmissionContext appContext, String nodeLabel)
+ throws InvocationTargetException, IllegalAccessException {
+ if (nodeLabelExpressionMethod != null) {
+ LOG.debug(
+ "Calling method {} of {}.",
+ nodeLabelExpressionMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ nodeLabelExpressionMethod.invoke(appContext, nodeLabel);
+ } else {
+ LOG.debug(
+ "{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(),
+ }
+ }
+ public void setAttemptFailuresValidityInterval(
+ ApplicationSubmissionContext appContext, long validityInterval)
+ throws InvocationTargetException, IllegalAccessException {
+ if (attemptFailuresValidityIntervalMethod != null) {
+ LOG.debug(
+ "Calling method {} of {}.",
+ attemptFailuresValidityIntervalMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
+ } else {
+ LOG.debug(
+ "{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(),
+ }
+ }
+ public void setKeepContainersAcrossApplicationAttempts(
+ ApplicationSubmissionContext appContext, boolean keepContainers)
+ throws InvocationTargetException, IllegalAccessException {
+ if (keepContainersMethod != null) {
+ LOG.debug(
+ "Calling method {} of {}.",
+ keepContainersMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ keepContainersMethod.invoke(appContext, keepContainers);
+ } else {
+ LOG.debug(
+ "{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(),
+ }
+ }
+ }
+ private static class YarnDeploymentException extends RuntimeException {
+ private static final long serialVersionUID = -812040641215388943L;
+ public YarnDeploymentException(String message) {
+ super(message);
+ }
+ public YarnDeploymentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+ private class DeploymentFailureHook extends Thread {
+ private final YarnClient yarnClient;
+ private final YarnClientApplication yarnApplication;
+ private final Path yarnFilesDir;
+ DeploymentFailureHook(YarnClientApplication yarnApplication, Path yarnFilesDir) {
+ this.yarnApplication = Preconditions.checkNotNull(yarnApplication);
+ this.yarnFilesDir = Preconditions.checkNotNull(yarnFilesDir);
+ // A new yarn client need to be created in shutdown hook in order to avoid
+ // the yarn client has been closed by YarnClusterDescriptor.
+ this.yarnClient = YarnClient.createYarnClient();
+ this.yarnClient.init(yarnConfiguration);
+ }
+ @Override
+ public void run() {
+"Cancelling deployment from Deployment Failure Hook");
+ yarnClient.start();
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ yarnClient.stop();
+"Deleting files in {}.", yarnFilesDir);
+ try {
+ FileSystem fs = FileSystem.get(yarnConfiguration);
+ if (!fs.delete(yarnFilesDir, true)) {
+ throw new IOException(
+ "Deleting files in " + yarnFilesDir + " was unsuccessful");
+ }
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
+ }
+ }
+ }
+ @VisibleForTesting
+ void addLibFoldersToShipFiles(Collection effectiveShipFiles) {
+ // Add lib folder to the ship files if the environment variable is set.
+ // This is for convenience when running from the command-line.
+ // (for other files users explicitly set the ship files)
+ String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
+ if (libDir != null) {
+ File directoryFile = new File(libDir);
+ if (directoryFile.isDirectory()) {
+ effectiveShipFiles.add(directoryFile);
+ } else {
+ throw new YarnDeploymentException(
+ "The environment variable '"
+ + "' is set to '"
+ + libDir
+ + "' but the directory doesn't exist.");
+ }
+ } else if (shipFiles.isEmpty()) {
+ LOG.warn(
+ "Environment variable '{}' not set and ship files have not been provided manually. "
+ + "Not shipping any library files.",
+ }
+ }
+ @VisibleForTesting
+ void addUsrLibFolderToShipFiles(Collection effectiveShipFiles) {
+ // Add usrlib folder to the ship files if it exists
+ // Classes in the folder will be loaded by UserClassLoader if CLASSPATH_INCLUDE_USER_JAR is
+ ClusterEntrypointUtils.tryFindUserLibDirectory()
+ .ifPresent(
+ usrLibDirFile -> {
+ effectiveShipFiles.add(usrLibDirFile);
+ "usrlib: {} will be shipped automatically.",
+ usrLibDirFile.getAbsolutePath());
+ });
+ }
+ @VisibleForTesting
+ void addPluginsFoldersToShipFiles(Collection effectiveShipFiles) {
+ final Optional pluginsDir = PluginConfig.getPluginsDir();
+ pluginsDir.ifPresent(effectiveShipFiles::add);
+ }
+ ContainerLaunchContext setupApplicationMasterContainer(
+ String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
+ // ------------------ Prepare Application Master Container ------------------------------
+ // respect custom JVM options in the YAML file
+ String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
+ if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
+ javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
+ }
+ // krb5.conf file will be available as local resource in JM/TM container
+ if (hasKrb5) {
+ javaOpts += "";
+ }
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ final Map startCommandValues = new HashMap<>();
+ startCommandValues.put("java", "$JAVA_HOME/bin/java");
+ String jvmHeapMem =
+ JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
+ startCommandValues.put("jvmmem", jvmHeapMem);
+ startCommandValues.put("jvmopts", javaOpts);
+ startCommandValues.put(
+ "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
+ startCommandValues.put("class", yarnClusterEntrypoint);
+ startCommandValues.put(
+ "redirects",
+ "1> "
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/jobmanager.out "
+ + "2> "
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/jobmanager.err");
+ String dynamicParameterListStr =
+ JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
+ startCommandValues.put("args", dynamicParameterListStr);
+ final String commandTemplate =
+ flinkConfiguration.getString(
+ final String amCommand =
+ BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
+ amContainer.setCommands(Collections.singletonList(amCommand));
+ LOG.debug("Application Master start command: " + amCommand);
+ return amContainer;
+ }
+ private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(
+ org.apache.flink.configuration.Configuration config) {
+ return config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
+ }
+ private static boolean isUsrLibDirIncludedInShipFiles(List shipFiles) {
+ return
+ .filter(File::isDirectory)
+ .map(File::getName)
+ .anyMatch(name -> name.equals(DEFAULT_FLINK_USR_LIB_DIR));
+ }
+ private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
+ checkNotNull(report);
+ final ApplicationId appId = report.getApplicationId();
+ final String host = report.getHost();
+ final int port = report.getRpcPort();
+"Found Web Interface {}:{} of application '{}'.", host, port, appId);
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
+ flinkConfiguration.setString(RestOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(RestOptions.PORT, port);
+ flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
+ setHAClusterIdIfNotSet(flinkConfiguration, appId);
+ }
+ private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
+ // set cluster-id to app id if not specified
+ if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+ configuration.set(
+ HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
+ }
+ }
+ public static void logDetachedClusterInformation(
+ ApplicationId yarnApplicationId, Logger logger) {
+ "The Flink YARN session cluster has been started in detached mode. In order to "
+ + "stop Flink gracefully, use the following command:\n"
+ + "$ echo \"stop\" | ./bin/ -id {}\n"
+ + "If this should not be possible, then you can also kill Flink via YARN's web interface or via:\n"
+ + "$ yarn application -kill {}\n"
+ + "Note that killing Flink might not clean up all job artifacts and temporary files.",
+ yarnApplicationId,
+ yarnApplicationId);
+ }
+ @VisibleForTesting
+ Map generateApplicationMasterEnv(
+ final YarnApplicationFileUploader fileUploader,
+ final String classPathStr,
+ final String localFlinkJarStr,
+ final String appIdStr)
+ throws IOException {
+ final Map env = new HashMap<>();
+ // set user specified app master environment variables
+ env.putAll(
+ ConfigurationUtils.getPrefixedKeyValuePairs(
+ this.flinkConfiguration));
+ // set Flink app class path
+ env.put(ENV_FLINK_CLASSPATH, classPathStr);
+ // Set FLINK_LIB_DIR to `lib` folder under working dir in container
+ env.put(ENV_FLINK_LIB_DIR, Path.CUR_DIR + "/" + ConfigConstants.DEFAULT_FLINK_LIB_DIR);
+ // Set FLINK_OPT_DIR to `opt` folder under working dir in container
+ env.put(ENV_FLINK_OPT_DIR, Path.CUR_DIR + "/" + ConfigConstants.DEFAULT_FLINK_OPT_DIR);
+ // set Flink on YARN internal configuration values
+ env.put(YarnConfigKeys.FLINK_DIST_JAR, localFlinkJarStr);
+ env.put(YarnConfigKeys.ENV_APP_ID, appIdStr);
+ env.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
+ env.put(
+ encodeYarnLocalResourceDescriptorListToString(
+ fileUploader.getEnvShipResourceList()));
+ env.put(
+ YarnConfigKeys.FLINK_YARN_FILES,
+ fileUploader.getApplicationDir().toUri().toString());
+ //
+ env.put(
+ UserGroupInformation.getCurrentUser().getUserName());
+ // set classpath from YARN configuration
+ Utils.setupYarnClassPath(this.yarnConfiguration, env);
+ return env;
+ }
diff --git a/chunjun-server/src/main/resources/ b/chunjun-server/src/main/resources/
new file mode 100644
index 0000000000..2dc1b8ca50
--- /dev/null
+++ b/chunjun-server/src/main/resources/
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+rootLogger.level = INFO
+rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
+rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
+ = consoleStdoutAppender
+appender.consoleStdout.type = CONSOLE = SYSTEM_OUT
+appender.consoleStdout.layout.type = PatternLayout
+appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
+appender.consoleStdout.filter.acceptLtWarn.level = WARN
+appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
+appender.consoleStdout.filter.acceptLtWarn.onMismatch = ACCEPT
+ = consoleStderrAppender
+appender.consoleStderr.type = CONSOLE = SYSTEM_ERR
+appender.consoleStderr.layout.type = PatternLayout
+appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
+appender.consoleStderr.filter.acceptGteWarn.level = WARN
+appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
+appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
diff --git a/conf/ b/conf/
new file mode 100644
index 0000000000..dee097dca7
--- /dev/null
+++ b/conf/
@@ -0,0 +1,28 @@
+#server port
+#server.port = 18081
+#open session check
+#flink session check,default for true
+#chunjun.session.check = true
+#chunjun.session.deploy = true
+#chunjun.restful.enable = true
+#yarn config
+yarn.queue = default
+#log config
+log.level = INFO
+flink.home.path = /Users/xuchao/MyPrograms/Flink/flink-1.16
+#flink.conf.path,default get from flink.home.path
+#flink.conf.path = /Users/xuchao/MyPrograms/Flink/flink-1.16/conf/
+#flink.lib.path,default get from flink.home.path
+#flink.lib.path = /Users/xuchao/MyPrograms/Flink/flink-1.16/lib/
+chunjun.home.path = /Users/xuchao/IdeaProjects/chunjun
+#chunjun.conf.path,default get from chunjun.home.path
+#chunjun.lib.path,default get from chunjun.home.path
+hadoop.conf.path = /Users/xuchao/conf/hadoopconf/dev_hadoop_new_flinkx01
diff --git a/conf/logconf/debug/ b/conf/logconf/debug/
new file mode 100644
index 0000000000..7124272246
--- /dev/null
+++ b/conf/logconf/debug/
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This affects logging for both user code and Flink
+log4j.rootLogger=DEBUG, rollingFile
+# Uncomment this if you want to _only_ change Flink's logging
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+# Log all infos in the given file
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.rollingFile.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %-60c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler, file
diff --git a/conf/logconf/debug/logback.xml b/conf/logconf/debug/logback.xml
new file mode 100644
index 0000000000..931a1fc6d5
--- /dev/null
+++ b/conf/logconf/debug/logback.xml
@@ -0,0 +1,76 @@
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+ 3
+ ${log.file}.%i
+ 30MB
diff --git a/conf/logconf/error/ b/conf/logconf/error/
new file mode 100644
index 0000000000..209806df69
--- /dev/null
+++ b/conf/logconf/error/
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This affects logging for both user code and Flink
+log4j.rootLogger=ERROR, rollingFile
+# Uncomment this if you want to _only_ change Flink's logging
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+# Log all infos in the given file
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.rollingFile.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %-60c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler, file
diff --git a/conf/logconf/error/logback.xml b/conf/logconf/error/logback.xml
new file mode 100644
index 0000000000..d4de1b4773
--- /dev/null
+++ b/conf/logconf/error/logback.xml
@@ -0,0 +1,76 @@
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+ 3
+ ${log.file}.%i
+ 30MB
diff --git a/conf/logconf/fatal/ b/conf/logconf/fatal/
new file mode 100644
index 0000000000..b5500fe4ef
--- /dev/null
+++ b/conf/logconf/fatal/
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This affects logging for both user code and Flink
+log4j.rootLogger=FATAL, rollingFile
+# Uncomment this if you want to _only_ change Flink's logging
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+# Log all infos in the given file
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.rollingFile.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %-60c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler, file
diff --git a/conf/logconf/fatal/logback.xml b/conf/logconf/fatal/logback.xml
new file mode 100644
index 0000000000..67f908b5d1
--- /dev/null
+++ b/conf/logconf/fatal/logback.xml
@@ -0,0 +1,76 @@
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+ 3
+ ${log.file}.%i
+ 30MB
diff --git a/conf/logconf/info/ b/conf/logconf/info/
new file mode 100644
index 0000000000..24fe206bbb
--- /dev/null
+++ b/conf/logconf/info/
@@ -0,0 +1,43 @@
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = MainAppender
+# Uncomment this if you want to _only_ change Flink's logging = org.apache.flink
+#logger.flink.level = INFO
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here. = akka
+logger.akka.level = INFO org.apache.kafka
+logger.kafka.level = INFO = org.apache.hadoop
+logger.hadoop.level = INFO = org.apache.zookeeper
+logger.zookeeper.level = INFO = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+# Log all infos in the given file = MainAppender
+appender.main.type = RollingFile
+appender.main.append = true
+appender.main.fileName = ${sys:log.file}
+appender.main.filePattern = ${sys:log.file}.%i
+appender.main.layout.type = PatternLayout
+appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.main.policies.type = Policies
+appender.main.policies.size.type = SizeBasedTriggeringPolicy
+appender.main.policies.size.size = 100MB
+appender.main.policies.startup.type = OnStartupTriggeringPolicy
+appender.main.strategy.type = DefaultRolloverStrategy
+appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler =
+logger.netty.level = OFF
diff --git a/conf/logconf/info/logback.xml b/conf/logconf/info/logback.xml
new file mode 100644
index 0000000000..5e6214d2c7
--- /dev/null
+++ b/conf/logconf/info/logback.xml
@@ -0,0 +1,76 @@
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+ 3
+ ${log.file}.%i
+ 30MB
diff --git a/conf/logconf/ b/conf/logconf/
new file mode 100644
index 0000000000..a8e5bfb457
--- /dev/null
+++ b/conf/logconf/
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This affects logging for both user code and Flink
+log4j.rootLogger=INFO, rollingFile
+# Uncomment this if you want to _only_ change Flink's logging
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+# Log all infos in the given file
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.rollingFile.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %-60c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler, file
diff --git a/conf/logconf/warn/ b/conf/logconf/warn/
new file mode 100644
index 0000000000..082a3163eb
--- /dev/null
+++ b/conf/logconf/warn/
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This affects logging for both user code and Flink
+log4j.rootLogger=WARN, rollingFile
+# Uncomment this if you want to _only_ change Flink's logging
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+# Log all infos in the given file
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.rollingFile.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %-60c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler, file
diff --git a/conf/logconf/warn/logback.xml b/conf/logconf/warn/logback.xml
new file mode 100644
index 0000000000..b337c44eef
--- /dev/null
+++ b/conf/logconf/warn/logback.xml
@@ -0,0 +1,76 @@
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+ ${log.file}
+ false
+ %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+ 3
+ ${log.file}.%i
+ 30MB
diff --git a/pom.xml b/pom.xml
index 5ce3ae4940..dc31571dc4 100755
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,7 @@
+ chunjun-server