diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index f908b05d52..755e7e2755 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -124,12 +124,12 @@ jobs: class: org.apache.streampark.e2e.cases.TeamManagementTest - name: MemberManagementTest class: org.apache.streampark.e2e.cases.MemberManagementTest - - name: ApplicationsFlink116OnYarnWithFlinkSQLTest - class: org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnWithFlinkSQLTest - - name: ApplicationsFlink117OnYarnWithFlinkSQLTest - class: org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnWithFlinkSQLTest - - name: ApplicationsFlink118OnYarnWithFlinkSQLTest - class: org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnWithFlinkSQLTest + - name: FlinkSQL116OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest + - name: FlinkSQL117OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest + - name: FlinkSQL118OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest env: RECORDING_PATH: /tmp/recording-${{ matrix.case.name }} steps: diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java similarity index 98% rename from streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java rename to streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java index 2b04270164..92d4c5daea 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java @@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml") -public class ApplicationsFlink116OnYarnWithFlinkSQLTest { +public class FlinkSQL116OnYarnTest { private static RemoteWebDriver browser; @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(30) + @Test + @Order(30) void testStartFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("FINISHED"))); } - // @Test - // @Order(31) + @Test + @Order(31) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); @@ -210,8 +210,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(70) + @Test + @Order(70) void testStartFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java similarity index 98% rename from streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java rename to streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java index 9fb19880e4..b98fae390e 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java @@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml") -public class ApplicationsFlink117OnYarnWithFlinkSQLTest { +public class FlinkSQL117OnYarnTest { private static RemoteWebDriver browser; @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(30) + @Test + @Order(30) void testStartFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("FINISHED"))); } - // @Test - // @Order(31) + @Test + @Order(31) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); @@ -211,8 +211,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(70) + @Test + @Order(70) void testStartFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java similarity index 98% rename from streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java rename to streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java index c252fb0897..a91da141aa 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java @@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml") -public class ApplicationsFlink118OnYarnWithFlinkSQLTest { +public class FlinkSQL118OnYarnTest { private static RemoteWebDriver browser; @@ -106,8 +106,8 @@ void testReleaseFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(30) + @Test + @Order(30) void testStartFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); @@ -128,8 +128,8 @@ void testStartFlinkApplicationOnYarnApplicationMode() { .anyMatch(it -> it.contains("FINISHED"))); } - // @Test - // @Order(31) + @Test + @Order(31) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); @@ -211,8 +211,8 @@ void testReleaseFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("SUCCESS"))); } - // @Test - // @Order(70) + @Test + @Order(70) void testStartFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); @@ -233,8 +233,8 @@ void testStartFlinkApplicationOnYarnPerJobMode() { .anyMatch(it -> it.contains("FINISHED"))); } - // @Test - // @Order(71) + @Test + @Order(71) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala index 5e572cbaf0..ec8f360b59 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala @@ -20,9 +20,9 @@ package org.apache.streampark.flink.proxy import java.io.{File, IOException} import java.net.{URL, URLClassLoader} import java.util +import java.util.function.Consumer import java.util.regex.Pattern -import scala.language.existentials import scala.util.Try /** @@ -33,11 +33,25 @@ import scala.util.Try * we don't override that. */ -class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: Pattern) +class ChildFirstClassLoader( + urls: Array[URL], + parent: ClassLoader, + flinkResourcePattern: Pattern, + classLoadingExceptionHandler: Consumer[Throwable]) extends URLClassLoader(urls, parent) { ClassLoader.registerAsParallelCapable() + def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: Pattern) { + this( + urls, + parent, + flinkResourcePattern, + (t: Throwable) => throw t) + } + + ClassLoader.registerAsParallelCapable() + private val FLINK_PATTERN = Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) @@ -50,6 +64,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResource "org.apache.log4j", "org.apache.logging", "org.apache.commons.logging", + "org.apache.commons.cli", "ch.qos.logback", "org.xml", "org.w3c", @@ -57,24 +72,27 @@ class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResource @throws[ClassNotFoundException] override def loadClass(name: String, resolve: Boolean): Class[_] = { - this.synchronized { - // First, check if the class has already been loaded - val clazz = super.findLoadedClass(name) match { - case null => - // check whether the class should go parent-first - for (parentFirstPattern <- PARENT_FIRST_PATTERNS) { - if (name.startsWith(parentFirstPattern)) { - super.loadClass(name, resolve) + try { + this.synchronized { + // First, check if the class has already been loaded + super.findLoadedClass(name) match { + case null => + // check whether the class should go parent-first + PARENT_FIRST_PATTERNS.find(name.startsWith) match { + case Some(_) => super.loadClass(name, resolve) + case _ => Try(findClass(name)).getOrElse(super.loadClass(name, resolve)) + } + case c => + if (resolve) { + resolveClass(c) } - } - Try(findClass(name)).getOrElse(super.loadClass(name, resolve)) - case c: Class[_] => - if (resolve) { - resolveClass(c) - } - c + c + } } - clazz + } catch { + case e: Throwable => + classLoadingExceptionHandler.accept(e) + null } } @@ -99,9 +117,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, flinkResource val spec = urlClassLoaderResource.getFile val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName val matchState = - FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern - .matcher(filename) - .matches + FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern.matcher(filename).matches if (matchState) { return null } diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala index 051f53fdf1..1d95303acb 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala @@ -33,13 +33,9 @@ object FlinkShimsProxy extends Logger { private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]() - private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = - MutableMap[String, ClassLoader]() + private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]() - private[this] val INCLUDE_PATTERN: Pattern = - Pattern.compile( - "(streampark-shaded-jackson-)(.*).jar", - Pattern.CASE_INSENSITIVE | Pattern.DOTALL) + private[this] val INCLUDE_PATTERN: Pattern = Pattern.compile("(streampark-shaded-jackson-)(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) private[this] def getFlinkShimsResourcePattern(majorVersion: String) = Pattern.compile(s"flink-(.*)-$majorVersion(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) @@ -58,8 +54,7 @@ object FlinkShimsProxy extends Logger { */ def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = { val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion) - ClassLoaderUtils - .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) + ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } /** @@ -74,8 +69,7 @@ object FlinkShimsProxy extends Logger { */ def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = { val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion) - ClassLoaderUtils - .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) + ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } // need to load all flink-table dependencies compatible with different versions @@ -85,12 +79,10 @@ object FlinkShimsProxy extends Logger { s"${flinkVersion.fullVersion}", { val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table") // 1) flink/lib/flink-table* - val libTableURL = - getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable) + val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable) // 2) After version 1.15 need add flink/opt/flink-table* - val optTableURL = - getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable) + val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable) val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*) // 3) add only streampark shims jar @@ -158,8 +150,7 @@ object FlinkShimsProxy extends Logger { */ def proxyVerifySql[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = { val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion) - ClassLoaderUtils - .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) + ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader)) } private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {