diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java index f768bde3de..ad75ff64c3 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java @@ -213,9 +213,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // This test cannot be executed due to a bug, and will be put online after issue #3761 fixed - // @Test - // @Order(70) + @Test + @Order(70) void testStartFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java index e3b904589c..1f8940221c 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java @@ -214,9 +214,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // This test cannot be executed due to a bug, and will be put online after issue #3761 fixed - // @Test - // @Order(70) + @Test + @Order(70) void testStartFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 0b97e92f23..a1ab7df309 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant import org.apache.streampark.common.conf.{FlinkVersion, Workspace} import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums._ -import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, HdfsUtils, PropertiesUtils} +import org.apache.streampark.common.util._ import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse} import org.apache.streampark.flink.util.FlinkUtils import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper @@ -111,6 +111,8 @@ case class SubmitRequest( } } + def hasProp(key: String): Boolean = properties.containsKey(key) + private[this] def getParameterMap(prefix: String = ""): Map[String, String] = { if (this.appConf == null) { return Map.empty[String, String] diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 7ff741163c..4cebe8d4e5 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -23,7 +23,7 @@ import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums._ import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util._ -import org.apache.streampark.flink.client.bean.{SubmitResponse, _} +import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption @@ -39,7 +39,6 @@ import org.apache.flink.client.program.{ClusterClient, PackagedProgram, Packaged import org.apache.flink.configuration._ import org.apache.flink.python.PythonOptions import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions} -import org.apache.flink.util.FlinkException import org.apache.flink.util.Preconditions.checkNotNull import java.io.File @@ -81,10 +80,42 @@ trait FlinkClientTrait extends Logger { |------------------------------------------------------------------------------------------- |""".stripMargin) + // prepare flink config + val flinkConfig = prepareConfig(submitRequest) + + // set JVMOptions.. + setJvmOptions(submitRequest, flinkConfig) + + setConfig(submitRequest, flinkConfig) + + Try(doSubmit(submitRequest, flinkConfig)) match { + case Success(resp) => resp + case Failure(e) => + logError( + s"flink job ${submitRequest.appName} start failed, " + + s"executionMode: ${submitRequest.executionMode.getName}, " + + s"detail: ${ExceptionUtils.stringifyException(e)}") + throw e + } + } + + private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration = { + val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest) submitRequest.developmentMode match { case FlinkDevelopmentMode.PYFLINK => + val pythonVenv: String = Workspace.local.APP_PYTHON_VENV + AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist") + + flinkConfig + // python.archives + .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) + // python.client.executable + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) + // python.executable + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) + val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR) if (StringUtils.isBlank(flinkOptPath)) { logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") @@ -113,9 +144,7 @@ trait FlinkClientTrait extends Logger { .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest)) .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId) - if ( - !submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key()) - ) { + if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) { val flinkDefaultConfiguration = getFlinkDefaultConfiguration( submitRequest.flinkVersion.flinkHome) // state.checkpoints.num-retained @@ -124,31 +153,20 @@ trait FlinkClientTrait extends Logger { } // set savepoint parameter - if (submitRequest.savePoint != null) { + if (StringUtils.isNotBlank(submitRequest.savePoint)) { flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint) flinkConfig.setBoolean( SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, submitRequest.allowNonRestoredState) - val enableRestoreModeState = submitRequest.flinkVersion.checkVersion( - FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null - if (enableRestoreModeState) { + val enableRestoreMode = + submitRequest.restoreMode != null && submitRequest.flinkVersion.checkVersion( + FlinkRestoreMode.SINCE_FLINK_VERSION) + if (enableRestoreMode) { flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName); } } - // set JVMOptions.. - setJvmOptions(submitRequest, flinkConfig) - - setConfig(submitRequest, flinkConfig) - - Try(doSubmit(submitRequest, flinkConfig)) match { - case Success(resp) => resp - case Failure(e) => - logError( - s"flink job ${submitRequest.appName} start failed, executionMode: ${submitRequest.executionMode.getName}, detail: ${ExceptionUtils - .stringifyException(e)}") - throw e - } + flinkConfig } private[this] def setJvmOptions( @@ -262,45 +280,41 @@ trait FlinkClientTrait extends Logger { submitRequest: SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = { - val pkgBuilder = PackagedProgram.newBuilder - .setEntryPointClassName( - flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() - ) - .setArguments( - flinkConfig - .getOptional(ApplicationConfiguration.APPLICATION_ARGS) - .orElse(Lists.newArrayList()): _* - ) - .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - - submitRequest.developmentMode match { - case FlinkDevelopmentMode.PYFLINK => - val pythonVenv: String = Workspace.local.APP_PYTHON_VENV - AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist") + val packagedProgramBuilder = { + val builder = PackagedProgram.newBuilder + .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) + .setEntryPointClassName( + flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() + ) + .setArguments( + flinkConfig + .getOptional(ApplicationConfiguration.APPLICATION_ARGS) + .orElse(Lists.newArrayList()): _* + ) - flinkConfig - // python.archives - .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) - // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) - // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) - if (submitRequest.libs.nonEmpty) { - pkgBuilder.setUserClassPaths(submitRequest.libs) - } - case _ => - pkgBuilder - .setUserClassPaths(submitRequest.classPaths) - .setJarFile(jarFile) + submitRequest.developmentMode match { + case FlinkDevelopmentMode.PYFLINK => + if (submitRequest.libs.nonEmpty) { + // BUG: https://github.com/apache/incubator-streampark/issues/3761 + // builder.setUserClassPaths(Lists.newArrayList(submitRequest.libs: _*)) + } + case _ => + builder + .setJarFile(jarFile) + // BUG: https://github.com/apache/incubator-streampark/issues/3761 + // .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*)) + } + builder } - val packageProgram = pkgBuilder.build() + val packageProgram = packagedProgramBuilder.build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, flinkConfig, getParallelism(submitRequest), null, false) + packageProgram -> jobGraph } @@ -337,16 +351,13 @@ trait FlinkClientTrait extends Logger { private[this] def getCustomCommandLines(flinkHome: String): JavaList[CustomCommandLine] = { val flinkDefaultConfiguration: Configuration = getFlinkDefaultConfiguration(flinkHome) // 1. find the configuration directory - val configurationDirectory = s"$flinkHome/conf" + val confDir = s"$flinkHome/conf" // 2. load the custom command lines - val customCommandLines = - loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory) - new CliFrontend(flinkDefaultConfiguration, customCommandLines) - customCommandLines + loadCustomCommandLines(flinkDefaultConfiguration, confDir) } private[client] def getParallelism(submitRequest: SubmitRequest): Integer = { - if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) { + if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) { Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString) } else { getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome) @@ -363,21 +374,22 @@ trait FlinkClientTrait extends Logger { val cliArgs = { val optionMap = new mutable.HashMap[String, Any]() submitRequest.appOption - .filter( - x => { - val verify = commandLineOptions.hasOption(x._1) - if (!verify) logWarn(s"param:${x._1} is error,skip it.") - verify - }) - .foreach( - x => { - val opt = commandLineOptions.getOption(x._1.trim).getOpt - Try(x._2.toBoolean).getOrElse(x._2) match { - case b if b.isInstanceOf[Boolean] => - if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true - case v => optionMap += s"-$opt" -> v + .foreach { + opt => + val verify = commandLineOptions.hasOption(opt._1) + if (!verify) { + logWarn(s"param:${opt._1} is error,skip it.") + } else { + val option = commandLineOptions.getOption(opt._1.trim).getOpt + Try(opt._2.toBoolean).getOrElse(opt._2) match { + case b if b.isInstanceOf[Boolean] => + if (b.asInstanceOf[Boolean]) { + optionMap += s"-$option" -> true + } + case v => optionMap += s"-$option" -> v + } } - }) + } // fromSavePoint if (submitRequest.savePoint != null) { @@ -391,9 +403,9 @@ trait FlinkClientTrait extends Logger { val array = new ArrayBuffer[String]() optionMap.foreach( - x => { - array += x._1 - x._2 match { + opt => { + array += opt._1 + opt._2 match { case v: String => array += v case _ => } @@ -401,12 +413,13 @@ trait FlinkClientTrait extends Logger { // app properties if (MapUtils.isNotEmpty(submitRequest.properties)) { - submitRequest.properties.foreach( - x => { - if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) { - array += s"-D${x._1}=${x._2}" + submitRequest.properties.foreach { + key => + if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) { + logInfo(s"submit application dynamicProperties: ${key._1} :${key._2}") + array += s"-D${key._1}=${key._2}" } - }) + } } array.toArray } @@ -480,21 +493,22 @@ trait FlinkClientTrait extends Logger { } // execution.runtime-mode - val addRuntimeModeState = - submitRequest.properties.nonEmpty && submitRequest.properties.containsKey( - ExecutionOptions.RUNTIME_MODE.key()) - if (addRuntimeModeState) { - programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}" - programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString + Try(submitRequest.properties(ExecutionOptions.RUNTIME_MODE.key()).toString) match { + case Success(runtimeMode) => + programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}" + programArgs += runtimeMode + case _ => } - val addUserJarFileState = - submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION - if (addUserJarFileState) { - // python file - programArgs.add("-py") - programArgs.add(submitRequest.userJarFile.getAbsolutePath) + if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) { + // TODO why executionMode is not yarn-application ??? + if (submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION) { + // python file + programArgs.add("-py") + programArgs.add(submitRequest.userJarFile.getAbsolutePath) + } } + Lists.newArrayList(programArgs: _*) } @@ -507,9 +521,9 @@ trait FlinkClientTrait extends Logger { val configuration = new Configuration() val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome) flinkDefaultConfiguration.keySet.foreach( - x => { - flinkDefaultConfiguration.getString(x, null) match { - case v if v != null => configuration.setString(x, v) + key => { + flinkDefaultConfiguration.getString(key, null) match { + case v if v != null => configuration.setString(key, v) case _ => } }) @@ -537,9 +551,7 @@ trait FlinkClientTrait extends Logger { val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false) val withDrain = Try(cancelRequest.withDrain).getOrElse(false) - ( - Try(cancelRequest.withSavepoint).getOrElse(false), - Try(cancelRequest.withDrain).getOrElse(false)) match { + (withSavepoint, withDrain) match { case (false, false) => client.cancel(jobID).get() null