Skip to content

Commit

Permalink
[Improve] flink on yarn-per-job mode bug fixed #3761 (#3834)
Browse files Browse the repository at this point in the history
* [Improve]  flink on yarn-per-job mode bug fixed #3761

* [Improve] FlinkClientTrait improvement

* [Improve] SubmitRequest minor improvement
  • Loading branch information
wolfboys authored Jul 5, 2024
1 parent 2d04bed commit 50ab5b6
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// This test cannot be executed due to a bug, and will be put online after issue #3761 fixed
// @Test
// @Order(70)
@Test
@Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// This test cannot be executed due to a bug, and will be put online after issue #3761 fixed
// @Test
// @Order(70)
@Test
@Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, HdfsUtils, PropertiesUtils}
import org.apache.streampark.common.util._
import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse}
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -111,6 +111,8 @@ case class SubmitRequest(
}
}

def hasProp(key: String): Boolean = properties.containsKey(key)

private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
if (this.appConf == null) {
return Map.empty[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums._
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util._
import org.apache.streampark.flink.client.bean.{SubmitResponse, _}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption

Expand All @@ -39,7 +39,6 @@ import org.apache.flink.client.program.{ClusterClient, PackagedProgram, Packaged
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull

import java.io.File
Expand Down Expand Up @@ -81,10 +80,42 @@ trait FlinkClientTrait extends Logger {
|-------------------------------------------------------------------------------------------
|""".stripMargin)

// prepare flink config
val flinkConfig = prepareConfig(submitRequest)

// set JVMOptions..
setJvmOptions(submitRequest, flinkConfig)

setConfig(submitRequest, flinkConfig)

Try(doSubmit(submitRequest, flinkConfig)) match {
case Success(resp) => resp
case Failure(e) =>
logError(
s"flink job ${submitRequest.appName} start failed, " +
s"executionMode: ${submitRequest.executionMode.getName}, " +
s"detail: ${ExceptionUtils.stringifyException(e)}")
throw e
}
}

private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration = {

val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest)

submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")

flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)

val flinkOptPath: String = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
if (StringUtils.isBlank(flinkOptPath)) {
logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
Expand Down Expand Up @@ -113,9 +144,7 @@ trait FlinkClientTrait extends Logger {
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)

if (
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
) {
if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
Expand All @@ -124,31 +153,20 @@ trait FlinkClientTrait extends Logger {
}

// set savepoint parameter
if (submitRequest.savePoint != null) {
if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
val enableRestoreModeState = submitRequest.flinkVersion.checkVersion(
FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
if (enableRestoreModeState) {
val enableRestoreMode =
submitRequest.restoreMode != null && submitRequest.flinkVersion.checkVersion(
FlinkRestoreMode.SINCE_FLINK_VERSION)
if (enableRestoreMode) {
flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
}
}

// set JVMOptions..
setJvmOptions(submitRequest, flinkConfig)

setConfig(submitRequest, flinkConfig)

Try(doSubmit(submitRequest, flinkConfig)) match {
case Success(resp) => resp
case Failure(e) =>
logError(
s"flink job ${submitRequest.appName} start failed, executionMode: ${submitRequest.executionMode.getName}, detail: ${ExceptionUtils
.stringifyException(e)}")
throw e
}
flinkConfig
}

private[this] def setJvmOptions(
Expand Down Expand Up @@ -262,45 +280,41 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {

val pkgBuilder = PackagedProgram.newBuilder
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*
)
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)

submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")
val packagedProgramBuilder = {
val builder = PackagedProgram.newBuilder
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
.orElse(Lists.newArrayList()): _*
)

flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
if (submitRequest.libs.nonEmpty) {
pkgBuilder.setUserClassPaths(submitRequest.libs)
}
case _ =>
pkgBuilder
.setUserClassPaths(submitRequest.classPaths)
.setJarFile(jarFile)
submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
if (submitRequest.libs.nonEmpty) {
// BUG: https://github.com/apache/incubator-streampark/issues/3761
// builder.setUserClassPaths(Lists.newArrayList(submitRequest.libs: _*))
}
case _ =>
builder
.setJarFile(jarFile)
// BUG: https://github.com/apache/incubator-streampark/issues/3761
// .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
}
builder
}

val packageProgram = pkgBuilder.build()
val packageProgram = packagedProgramBuilder.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
flinkConfig,
getParallelism(submitRequest),
null,
false)

packageProgram -> jobGraph
}

Expand Down Expand Up @@ -337,16 +351,13 @@ trait FlinkClientTrait extends Logger {
private[this] def getCustomCommandLines(flinkHome: String): JavaList[CustomCommandLine] = {
val flinkDefaultConfiguration: Configuration = getFlinkDefaultConfiguration(flinkHome)
// 1. find the configuration directory
val configurationDirectory = s"$flinkHome/conf"
val confDir = s"$flinkHome/conf"
// 2. load the custom command lines
val customCommandLines =
loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
new CliFrontend(flinkDefaultConfiguration, customCommandLines)
customCommandLines
loadCustomCommandLines(flinkDefaultConfiguration, confDir)
}

private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
if (submitRequest.hasProp(KEY_FLINK_PARALLELISM())) {
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
Expand All @@ -363,21 +374,22 @@ trait FlinkClientTrait extends Logger {
val cliArgs = {
val optionMap = new mutable.HashMap[String, Any]()
submitRequest.appOption
.filter(
x => {
val verify = commandLineOptions.hasOption(x._1)
if (!verify) logWarn(s"param:${x._1} is error,skip it.")
verify
})
.foreach(
x => {
val opt = commandLineOptions.getOption(x._1.trim).getOpt
Try(x._2.toBoolean).getOrElse(x._2) match {
case b if b.isInstanceOf[Boolean] =>
if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true
case v => optionMap += s"-$opt" -> v
.foreach {
opt =>
val verify = commandLineOptions.hasOption(opt._1)
if (!verify) {
logWarn(s"param:${opt._1} is error,skip it.")
} else {
val option = commandLineOptions.getOption(opt._1.trim).getOpt
Try(opt._2.toBoolean).getOrElse(opt._2) match {
case b if b.isInstanceOf[Boolean] =>
if (b.asInstanceOf[Boolean]) {
optionMap += s"-$option" -> true
}
case v => optionMap += s"-$option" -> v
}
}
})
}

// fromSavePoint
if (submitRequest.savePoint != null) {
Expand All @@ -391,22 +403,23 @@ trait FlinkClientTrait extends Logger {

val array = new ArrayBuffer[String]()
optionMap.foreach(
x => {
array += x._1
x._2 match {
opt => {
array += opt._1
opt._2 match {
case v: String => array += v
case _ =>
}
})

// app properties
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach(
x => {
if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
array += s"-D${x._1}=${x._2}"
submitRequest.properties.foreach {
key =>
if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
logInfo(s"submit application dynamicProperties: ${key._1} :${key._2}")
array += s"-D${key._1}=${key._2}"
}
})
}
}
array.toArray
}
Expand Down Expand Up @@ -480,21 +493,22 @@ trait FlinkClientTrait extends Logger {
}

// execution.runtime-mode
val addRuntimeModeState =
submitRequest.properties.nonEmpty && submitRequest.properties.containsKey(
ExecutionOptions.RUNTIME_MODE.key())
if (addRuntimeModeState) {
programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
Try(submitRequest.properties(ExecutionOptions.RUNTIME_MODE.key()).toString) match {
case Success(runtimeMode) =>
programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
programArgs += runtimeMode
case _ =>
}

val addUserJarFileState =
submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
if (addUserJarFileState) {
// python file
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
// TODO why executionMode is not yarn-application ???
if (submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION) {
// python file
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
}
}

Lists.newArrayList(programArgs: _*)
}

Expand All @@ -507,9 +521,9 @@ trait FlinkClientTrait extends Logger {
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
x => {
flinkDefaultConfiguration.getString(x, null) match {
case v if v != null => configuration.setString(x, v)
key => {
flinkDefaultConfiguration.getString(key, null) match {
case v if v != null => configuration.setString(key, v)
case _ =>
}
})
Expand Down Expand Up @@ -537,9 +551,7 @@ trait FlinkClientTrait extends Logger {
val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false)
val withDrain = Try(cancelRequest.withDrain).getOrElse(false)

(
Try(cancelRequest.withSavepoint).getOrElse(false),
Try(cancelRequest.withDrain).getOrElse(false)) match {
(withSavepoint, withDrain) match {
case (false, false) =>
client.cancel(jobID).get()
null
Expand Down

0 comments on commit 50ab5b6

Please sign in to comment.