From 8fdf5b6c36de0396b0ba511d48ed665af80d37a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Fri, 3 Jan 2025 12:02:49 +0100 Subject: [PATCH] [NU-1941] Evaluate value every time for count > 1 in sample-generator (#7376) evaluate value every time for count > 1 --- docs/Changelog.md | 7 ++- .../SampleGeneratorSourceFactorySpec.scala | 43 ++++++++++++++- .../SampleGeneratorSourceFactory.scala | 55 +++++++++---------- 3 files changed, 72 insertions(+), 33 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index e5b1692a6ad..80fbd463b6d 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -34,8 +34,11 @@ this fixes issue with unexpected low scale when performing division on BigDecimals which were created in such conversion. * [#7379](https://github.com/TouK/nussknacker/pull/7379) Removed CustomAction mechanism. * Changes to `periodic` component (renamed to `sample-generator`): - * [#7368](https://github.com/TouK/nussknacker/pull/7368) Component rename: `periodic` to `sample-generator` - * [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `period` editor + * [#7368](https://github.com/TouK/nussknacker/pull/7368) Component rename: `periodic` to `sample-generator` + * [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `period` editor + * [#7376](https://github.com/TouK/nussknacker/pull/7376) Previously, when count was > 1, the value was evaluated once + and emitted times count. For example: if the value was evaluated to be a random UUID and count was 5, one UUID was + generated and emitted 5 times. Now in one count batch each value is evaluated separately. ## 1.18 diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala index 63fae3a05d7..2189dec4539 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala @@ -14,7 +14,12 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder import pl.touk.nussknacker.test.PatientScalaFutures -class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientScalaFutures with Matchers with Inside { +class SampleGeneratorSourceFactorySpec + extends AnyFunSuite + with FlinkSpec + with PatientScalaFutures + with Matchers + with Inside { test("should produce results for each element in list") { val sinkId = "sinkId" @@ -52,4 +57,40 @@ class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with P } + test("should produce n individually evaluated results for n count") { + val sinkId = "sinkId" + + val collectingListener = ResultsCollectingListenerHolder.registerListener + val model = LocalModelData( + ConfigFactory.empty(), + FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components, + configCreator = new ConfigCreatorWithCollectingListener(collectingListener), + ) + val scenario = ScenarioBuilder + .streaming("test") + .source( + "sample-generator", + "sample-generator", + "period" -> "T(java.time.Duration).ofSeconds(1)".spel, + "count" -> "2".spel, + "value" -> s"T(java.util.UUID).randomUUID".spel + ) + .emptySink(sinkId, "dead-end") + + val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() + UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(scenario) + + val id = stoppableEnv.executeAndWaitForStart(scenario.name.value) + try { + eventually { + val results = collectingListener.results.nodeResults.get(sinkId) + val emittedResults = results.toList.flatten.flatMap(_.variableTyped("input")) + emittedResults.size should be > 1 + emittedResults.distinct.size shouldBe emittedResults.size + } + } finally { + stoppableEnv.cancel(id.getJobID) + } + } + } diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala index 68f61e59228..478df94e44e 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala @@ -9,13 +9,12 @@ import org.apache.flink.util.Collector import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent import pl.touk.nussknacker.engine.api.editor.{DualEditor, DualEditorMode, SimpleEditor, SimpleEditorType} -import pl.touk.nussknacker.engine.api.process.{BasicContextInitializer, Source, SourceFactory} -import pl.touk.nussknacker.engine.api.typed.typing.Unknown +import pl.touk.nussknacker.engine.api.process.{Source, SourceFactory} import pl.touk.nussknacker.engine.api.typed.{ReturningType, typing} import pl.touk.nussknacker.engine.flink.api.process.{ - FlinkContextInitializingFunction, + CustomizableTimestampWatermarkHandlerSource, FlinkCustomNodeContext, - FlinkSource + StandardFlinkSource } import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{ StandardTimestampWatermarkHandler, @@ -43,12 +42,10 @@ object SampleGeneratorSourceFactory ) ) -class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[AnyRef]) +class SampleGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHandler[AnyRef]) extends SourceFactory with UnboundedStreamComponent { - import pl.touk.nussknacker.engine.flink.api.datastream.DataStreamImplicits._ - @silent("deprecated") @MethodToInvoke def create( @@ -65,36 +62,34 @@ class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[ @ParamName("count") @Nullable @Min(1) nullableCount: Integer, @ParamName("value") value: LazyParameter[AnyRef] ): Source = { - new FlinkSource with ReturningType { - - override def contextStream(env: StreamExecutionEnvironment, ctx: FlinkCustomNodeContext): DataStream[Context] = { - val count = Option(nullableCount).map(_.toInt).getOrElse(1) - val processName = ctx.metaData.name - val stream = env + new StandardFlinkSource[AnyRef] with ReturningType with CustomizableTimestampWatermarkHandlerSource[AnyRef] { + + override protected def sourceStream( + env: StreamExecutionEnvironment, + flinkNodeContext: FlinkCustomNodeContext + ): DataStream[AnyRef] = { + val count = Option(nullableCount).map(_.toInt).getOrElse(1) + // Parameter evaluation requires context, so here we create an empty context just to evaluate the `value` param. + // Later the evaluated value is extracted from this temporary context and proper context is initialized. + env .addSource(new PeriodicFunction(period)) - .map(_ => Context(processName.value)) - .flatMap(value)(ctx) .flatMap( - (value: ValueWithContext[AnyRef], out: Collector[AnyRef]) => - 1.to(count).map(_ => value.value).foreach(out.collect), - TypeInformationDetection.instance.forType[AnyRef](value.returnType) + (_: Unit, out: Collector[Context]) => { + val temporaryContextForEvaluation = Context(flinkNodeContext.metaData.name.value) + (1 to count).foreach(_ => out.collect(temporaryContextForEvaluation)) + }, + TypeInformationDetection.instance.forClass[Context] ) - - val rawSourceWithTimestamp = timestampAssigner.assignTimestampAndWatermarks(stream) - - rawSourceWithTimestamp - .map( - new FlinkContextInitializingFunction[AnyRef]( - new BasicContextInitializer[AnyRef](Unknown), - ctx.nodeId, - ctx.convertToEngineRuntimeContext - ), - ctx.contextTypeInfo + .flatMap(flinkNodeContext.lazyParameterHelper.lazyMapFunction(value)) + .flatMap( + (value: ValueWithContext[AnyRef], out: Collector[AnyRef]) => out.collect(value.value), + TypeInformationDetection.instance.forType[AnyRef](value.returnType) ) } - override val returnType: typing.TypingResult = value.returnType + override def timestampAssigner: Option[TimestampWatermarkHandler[AnyRef]] = Some(customTimestampAssigner) + override val returnType: typing.TypingResult = value.returnType } }