Skip to content

Commit

Permalink
[Improve] flink on yarn e2e testcase (#3860)
Browse files Browse the repository at this point in the history
* [Improve] flink on yarn e2e testcase

* [Improve] Try fix e2e ci failed

* [Bug] ChildFirstClassloader loadClass bug fixed

* [Improve] load commons-cli classes improvements

* [Improve] flink-sql on yarn test-case classes name improvements
  • Loading branch information
wolfboys authored Jul 14, 2024
1 parent fc856b0 commit dda8730
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 66 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ jobs:
class: org.apache.streampark.e2e.cases.TeamManagementTest
- name: MemberManagementTest
class: org.apache.streampark.e2e.cases.MemberManagementTest
- name: ApplicationsFlink116OnYarnWithFlinkSQLTest
class: org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnWithFlinkSQLTest
- name: ApplicationsFlink117OnYarnWithFlinkSQLTest
class: org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnWithFlinkSQLTest
- name: ApplicationsFlink118OnYarnWithFlinkSQLTest
class: org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnWithFlinkSQLTest
- name: FlinkSQL116OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest
- name: FlinkSQL117OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest
- name: FlinkSQL118OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
public class FlinkSQL116OnYarnTest {

private static RemoteWebDriver browser;

Expand Down Expand Up @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(30)
@Test
@Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand All @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("FINISHED")));
}

// @Test
// @Order(31)
@Test
@Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
Expand Down Expand Up @@ -210,8 +210,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(70)
@Test
@Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
public class FlinkSQL117OnYarnTest {

private static RemoteWebDriver browser;

Expand Down Expand Up @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(30)
@Test
@Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand All @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("FINISHED")));
}

// @Test
// @Order(31)
@Test
@Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
Expand Down Expand Up @@ -211,8 +211,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(70)
@Test
@Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
public class FlinkSQL118OnYarnTest {

private static RemoteWebDriver browser;

Expand Down Expand Up @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(30)
@Test
@Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand All @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() {
.anyMatch(it -> it.contains("FINISHED")));
}

// @Test
// @Order(31)
@Test
@Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
Expand Down Expand Up @@ -211,8 +211,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("SUCCESS")));
}

// @Test
// @Order(70)
@Test
@Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

Expand All @@ -233,8 +233,8 @@ void testStartFlinkApplicationOnYarnPerJobMode() {
.anyMatch(it -> it.contains("FINISHED")));
}

// @Test
// @Order(71)
@Test
@Order(71)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.streampark.flink.proxy
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util
import java.util.function.Consumer
import java.util.regex.Pattern

import scala.language.existentials
import scala.util.Try

/**
Expand All @@ -33,11 +33,25 @@ import scala.util.Try
* we don't override that.
*/

class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: Pattern)
class ChildFirstClassLoader(
urls: Array[URL],
parent: ClassLoader,
flinkResourcePattern: Pattern,
classLoadingExceptionHandler: Consumer[Throwable])
extends URLClassLoader(urls, parent) {

ClassLoader.registerAsParallelCapable()

def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: Pattern) {
this(
urls,
parent,
flinkResourcePattern,
(t: Throwable) => throw t)
}

ClassLoader.registerAsParallelCapable()

private val FLINK_PATTERN =
Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL)

Expand All @@ -50,31 +64,35 @@ class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResource
"org.apache.log4j",
"org.apache.logging",
"org.apache.commons.logging",
"org.apache.commons.cli",
"ch.qos.logback",
"org.xml",
"org.w3c",
"org.apache.hadoop")

@throws[ClassNotFoundException]
override def loadClass(name: String, resolve: Boolean): Class[_] = {
this.synchronized {
// First, check if the class has already been loaded
val clazz = super.findLoadedClass(name) match {
case null =>
// check whether the class should go parent-first
for (parentFirstPattern <- PARENT_FIRST_PATTERNS) {
if (name.startsWith(parentFirstPattern)) {
super.loadClass(name, resolve)
try {
this.synchronized {
// First, check if the class has already been loaded
super.findLoadedClass(name) match {
case null =>
// check whether the class should go parent-first
PARENT_FIRST_PATTERNS.find(name.startsWith) match {
case Some(_) => super.loadClass(name, resolve)
case _ => Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
}
case c =>
if (resolve) {
resolveClass(c)
}
}
Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
case c: Class[_] =>
if (resolve) {
resolveClass(c)
}
c
c
}
}
clazz
} catch {
case e: Throwable =>
classLoadingExceptionHandler.accept(e)
null
}
}

Expand All @@ -99,9 +117,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResource
val spec = urlClassLoaderResource.getFile
val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
val matchState =
FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern
.matcher(filename)
.matches
FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern.matcher(filename).matches
if (matchState) {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ object FlinkShimsProxy extends Logger {

private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()

private[this] val VERIFY_SQL_CLASS_LOADER_CACHE =
MutableMap[String, ClassLoader]()
private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()

private[this] val INCLUDE_PATTERN: Pattern =
Pattern.compile(
"(streampark-shaded-jackson-)(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
private[this] val INCLUDE_PATTERN: Pattern = Pattern.compile("(streampark-shaded-jackson-)(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL)

private[this] def getFlinkShimsResourcePattern(majorVersion: String) =
Pattern.compile(s"flink-(.*)-$majorVersion(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
Expand All @@ -58,8 +54,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = {
val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
ClassLoaderUtils
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}

/**
Expand All @@ -74,8 +69,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
ClassLoaderUtils
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}

// need to load all flink-table dependencies compatible with different versions
Expand All @@ -85,12 +79,10 @@ object FlinkShimsProxy extends Logger {
s"${flinkVersion.fullVersion}", {
val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table")
// 1) flink/lib/flink-table*
val libTableURL =
getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)
val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)

// 2) After version 1.15 need add flink/opt/flink-table*
val optTableURL =
getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)

// 3) add only streampark shims jar
Expand Down Expand Up @@ -158,8 +150,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxyVerifySql[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion)
ClassLoaderUtils
.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}

private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
Expand Down

0 comments on commit dda8730

Please sign in to comment.