Skip to content

Commit

Permalink
referted classpath fix in test classes
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 17, 2025
1 parent 7d7836c commit 3ac7e6e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration
modelClassLoader.urls match {
// FIXME abr: is it necessary?
case Nil =>
ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException](
configuration,
PipelineOptions.CLASSPATHS,
new URL(_)
)
// ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException](
// configuration,
// PipelineOptions.CLASSPATHS,
// new URL(_)
// )
case list => list.asJava
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.process.runner

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import io.circe.Json
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.client.JobExecutionException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand Down Expand Up @@ -770,7 +771,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor
ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider))
)
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration())
FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.setupMemory(new Configuration))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef
import com.typesafe.scalalogging.LazyLogging
import io.circe.Json
import io.circe.Json.{Null, fromString, obj}
import org.apache.flink.configuration.Configuration
import org.apache.kafka.common.record.TimestampType
import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -51,12 +52,7 @@ class TestFromFileSpec
inputConfig = config,
components = List.empty,
configCreator = new KafkaSourceFactoryProcessConfigCreator(() => TestFromFileSpec.resultsHolders),
// This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265
// Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty
// (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager)
// which holds all needed jars/classes in case of running from Scala plugin in IDE.
// but in case of running from sbt it contains only sbt-launcher.jar
modelClassLoader = new ModelClassLoader(getClass.getClassLoader, List(new URL("http://dummy-classpath.invalid")))
modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround)
)

test("Should pass correct timestamp from test data") {
Expand Down Expand Up @@ -141,7 +137,7 @@ class TestFromFileSpec
modelData,
process,
scenarioTestData,
FlinkTestConfiguration.configuration(),
FlinkTestConfiguration.setupMemory(new Configuration),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.circe.Json
import io.circe.Json._
import org.apache.avro.Schema
import org.apache.flink.configuration.Configuration
import org.apache.kafka.common.record.TimestampType
import org.scalatest.{LoneElement, OptionValues}
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -37,8 +38,10 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.util.json.ToJsonEncoder
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties}

import java.net.URL
import java.util.Collections

class TestWithTestDataSpec
Expand Down Expand Up @@ -200,10 +203,15 @@ class TestWithTestDataSpec
private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = {
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
FlinkTestMain.run(
LocalModelData(config, List.empty, configCreator = creator),
LocalModelData(
config,
List.empty,
configCreator = creator,
modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround)
),
process,
scenarioTestData,
FlinkTestConfiguration.configuration(),
FlinkTestConfiguration.setupMemory(new Configuration),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pl.touk.nussknacker.engine.flink.test
import com.github.ghik.silencer.silent
import org.apache.flink.configuration._

import java.net.URL

object FlinkTestConfiguration {

// better to create each time because is mutable
Expand All @@ -12,10 +14,26 @@ object FlinkTestConfiguration {
val config = new Configuration
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagersCount)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskSlotsCount)

config.set(PipelineOptions.CLASSPATHS, classpathWorkaround.map(_.toString).asJava)

setupMemory(config)
}

// FIXME: better describe which classpath is used in this case
// This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265
// Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty
// (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager)
// which holds all needed jars/classes in case of running from Scala plugin in IDE.
// but in case of running from sbt it contains only sbt-launcher.jar
def classpathWorkaround: List[URL] = {
List(new URL("http://dummy-classpath.invalid"))
}

def setupMemory(config: Configuration): Configuration = {
// to prevent OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool on low memory env (like Travis)
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m"))
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m"))

// This is to prevent memory problem in tests with mutliple Table API based aggregations. An IllegalArgExceptionon
// is thrown with message "The minBucketMemorySize is not valid!" in
// org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.java:121 where memorySize is set
Expand Down

0 comments on commit 3ac7e6e

Please sign in to comment.