diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index 295d0a70daf..a68ba94f844 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -76,11 +76,11 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration modelClassLoader.urls match { // FIXME abr: is it necessary? case Nil => - ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( - configuration, - PipelineOptions.CLASSPATHS, - new URL(_) - ) +// ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( +// configuration, +// PipelineOptions.CLASSPATHS, +// new URL(_) +// ) case list => list.asJava } } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index 13db1d66a46..187ba78fa46 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.process.runner import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import io.circe.Json +import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.client.JobExecutionException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -770,7 +771,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider)) ) ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration()) + FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.setupMemory(new Configuration)) } } diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala index 615d0000a44..19ffbbfacc7 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala @@ -5,6 +5,7 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef import com.typesafe.scalalogging.LazyLogging import io.circe.Json import io.circe.Json.{Null, fromString, obj} +import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite @@ -51,12 +52,7 @@ class TestFromFileSpec inputConfig = config, components = List.empty, configCreator = new KafkaSourceFactoryProcessConfigCreator(() => TestFromFileSpec.resultsHolders), - // This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 - // Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty - // (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager) - // which holds all needed jars/classes in case of running from Scala plugin in IDE. - // but in case of running from sbt it contains only sbt-launcher.jar - modelClassLoader = new ModelClassLoader(getClass.getClassLoader, List(new URL("http://dummy-classpath.invalid"))) + modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) ) test("Should pass correct timestamp from test data") { @@ -141,7 +137,7 @@ class TestFromFileSpec modelData, process, scenarioTestData, - FlinkTestConfiguration.configuration(), + FlinkTestConfiguration.setupMemory(new Configuration), ) } } diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala index 9ca30564187..0e9f4f160cb 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.LazyLogging import io.circe.Json import io.circe.Json._ import org.apache.avro.Schema +import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType import org.scalatest.{LoneElement, OptionValues} import org.scalatest.funsuite.AnyFunSuite @@ -37,8 +38,10 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.util.json.ToJsonEncoder +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} +import java.net.URL import java.util.Collections class TestWithTestDataSpec @@ -200,10 +203,15 @@ class TestWithTestDataSpec private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { FlinkTestMain.run( - LocalModelData(config, List.empty, configCreator = creator), + LocalModelData( + config, + List.empty, + configCreator = creator, + modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) + ), process, scenarioTestData, - FlinkTestConfiguration.configuration(), + FlinkTestConfiguration.setupMemory(new Configuration), ) } } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala index 48b05889f6d..6c92d6e70fd 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala @@ -3,6 +3,8 @@ package pl.touk.nussknacker.engine.flink.test import com.github.ghik.silencer.silent import org.apache.flink.configuration._ +import java.net.URL + object FlinkTestConfiguration { // better to create each time because is mutable @@ -12,10 +14,26 @@ object FlinkTestConfiguration { val config = new Configuration config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagersCount) config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskSlotsCount) + + config.set(PipelineOptions.CLASSPATHS, classpathWorkaround.map(_.toString).asJava) + + setupMemory(config) + } + + // FIXME: better describe which classpath is used in this case + // This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 + // Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty + // (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager) + // which holds all needed jars/classes in case of running from Scala plugin in IDE. + // but in case of running from sbt it contains only sbt-launcher.jar + def classpathWorkaround: List[URL] = { + List(new URL("http://dummy-classpath.invalid")) + } + + def setupMemory(config: Configuration): Configuration = { // to prevent OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool on low memory env (like Travis) config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) - // This is to prevent memory problem in tests with mutliple Table API based aggregations. An IllegalArgExceptionon // is thrown with message "The minBucketMemorySize is not valid!" in // org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.java:121 where memorySize is set