Skip to content

Commit

Permalink
[NU-1941] Evaluate value every time for count > 1 in sample-generator (
Browse files Browse the repository at this point in the history
…#7376)

evaluate value every time for count > 1
  • Loading branch information
mslabek authored Jan 3, 2025
1 parent a3aa89c commit 8fdf5b6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 33 deletions.
7 changes: 5 additions & 2 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
this fixes issue with unexpected low scale when performing division on BigDecimals which were created in such conversion.
* [#7379](https://github.com/TouK/nussknacker/pull/7379) Removed CustomAction mechanism.
* Changes to `periodic` component (renamed to `sample-generator`):
* [#7368](https://github.com/TouK/nussknacker/pull/7368) Component rename: `periodic` to `sample-generator`
* [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `period` editor
* [#7368](https://github.com/TouK/nussknacker/pull/7368) Component rename: `periodic` to `sample-generator`
* [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `period` editor
* [#7376](https://github.com/TouK/nussknacker/pull/7376) Previously, when count was > 1, the value was evaluated once
and emitted times count. For example: if the value was evaluated to be a random UUID and count was 5, one UUID was
generated and emitted 5 times. Now in one count batch each value is evaluated separately.

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder
import pl.touk.nussknacker.test.PatientScalaFutures

class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientScalaFutures with Matchers with Inside {
class SampleGeneratorSourceFactorySpec
extends AnyFunSuite
with FlinkSpec
with PatientScalaFutures
with Matchers
with Inside {

test("should produce results for each element in list") {
val sinkId = "sinkId"
Expand Down Expand Up @@ -52,4 +57,40 @@ class SampleGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with P

}

test("should produce n individually evaluated results for n count") {
val sinkId = "sinkId"

val collectingListener = ResultsCollectingListenerHolder.registerListener
val model = LocalModelData(
ConfigFactory.empty(),
FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components,
configCreator = new ConfigCreatorWithCollectingListener(collectingListener),
)
val scenario = ScenarioBuilder
.streaming("test")
.source(
"sample-generator",
"sample-generator",
"period" -> "T(java.time.Duration).ofSeconds(1)".spel,
"count" -> "2".spel,
"value" -> s"T(java.util.UUID).randomUUID".spel
)
.emptySink(sinkId, "dead-end")

val stoppableEnv = flinkMiniCluster.createExecutionEnvironment()
UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(scenario)

val id = stoppableEnv.executeAndWaitForStart(scenario.name.value)
try {
eventually {
val results = collectingListener.results.nodeResults.get(sinkId)
val emittedResults = results.toList.flatten.flatMap(_.variableTyped("input"))
emittedResults.size should be > 1
emittedResults.distinct.size shouldBe emittedResults.size
}
} finally {
stoppableEnv.cancel(id.getJobID)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.editor.{DualEditor, DualEditorMode, SimpleEditor, SimpleEditorType}
import pl.touk.nussknacker.engine.api.process.{BasicContextInitializer, Source, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.api.process.{Source, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.{ReturningType, typing}
import pl.touk.nussknacker.engine.flink.api.process.{
FlinkContextInitializingFunction,
CustomizableTimestampWatermarkHandlerSource,
FlinkCustomNodeContext,
FlinkSource
StandardFlinkSource
}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{
StandardTimestampWatermarkHandler,
Expand Down Expand Up @@ -43,12 +42,10 @@ object SampleGeneratorSourceFactory
)
)

class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[AnyRef])
class SampleGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHandler[AnyRef])
extends SourceFactory
with UnboundedStreamComponent {

import pl.touk.nussknacker.engine.flink.api.datastream.DataStreamImplicits._

@silent("deprecated")
@MethodToInvoke
def create(
Expand All @@ -65,36 +62,34 @@ class SampleGeneratorSourceFactory(timestampAssigner: TimestampWatermarkHandler[
@ParamName("count") @Nullable @Min(1) nullableCount: Integer,
@ParamName("value") value: LazyParameter[AnyRef]
): Source = {
new FlinkSource with ReturningType {

override def contextStream(env: StreamExecutionEnvironment, ctx: FlinkCustomNodeContext): DataStream[Context] = {
val count = Option(nullableCount).map(_.toInt).getOrElse(1)
val processName = ctx.metaData.name
val stream = env
new StandardFlinkSource[AnyRef] with ReturningType with CustomizableTimestampWatermarkHandlerSource[AnyRef] {

override protected def sourceStream(
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext
): DataStream[AnyRef] = {
val count = Option(nullableCount).map(_.toInt).getOrElse(1)
// Parameter evaluation requires context, so here we create an empty context just to evaluate the `value` param.
// Later the evaluated value is extracted from this temporary context and proper context is initialized.
env
.addSource(new PeriodicFunction(period))
.map(_ => Context(processName.value))
.flatMap(value)(ctx)
.flatMap(
(value: ValueWithContext[AnyRef], out: Collector[AnyRef]) =>
1.to(count).map(_ => value.value).foreach(out.collect),
TypeInformationDetection.instance.forType[AnyRef](value.returnType)
(_: Unit, out: Collector[Context]) => {
val temporaryContextForEvaluation = Context(flinkNodeContext.metaData.name.value)
(1 to count).foreach(_ => out.collect(temporaryContextForEvaluation))
},
TypeInformationDetection.instance.forClass[Context]
)

val rawSourceWithTimestamp = timestampAssigner.assignTimestampAndWatermarks(stream)

rawSourceWithTimestamp
.map(
new FlinkContextInitializingFunction[AnyRef](
new BasicContextInitializer[AnyRef](Unknown),
ctx.nodeId,
ctx.convertToEngineRuntimeContext
),
ctx.contextTypeInfo
.flatMap(flinkNodeContext.lazyParameterHelper.lazyMapFunction(value))
.flatMap(
(value: ValueWithContext[AnyRef], out: Collector[AnyRef]) => out.collect(value.value),
TypeInformationDetection.instance.forType[AnyRef](value.returnType)
)
}

override val returnType: typing.TypingResult = value.returnType
override def timestampAssigner: Option[TimestampWatermarkHandler[AnyRef]] = Some(customTimestampAssigner)

override val returnType: typing.TypingResult = value.returnType
}
}

Expand Down

0 comments on commit 8fdf5b6

Please sign in to comment.