diff --git a/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala index 733e641bc8a..0b333abad66 100644 --- a/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala +++ b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala @@ -14,6 +14,7 @@ class DefaultModelMigrations extends ProcessMigrations { // 100 -> NewMigration, // Newly added migrations should be in the hundreds: 100, 200, 300 and so on. We do this because // many ProcessMigrations can be loaded using SPI, and we want to avoid overlapping numbers when merging. + 100 -> SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter ) } diff --git a/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/migrations/SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter.scala b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/migrations/SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter.scala new file mode 100644 index 00000000000..ae1a6a7d7a9 --- /dev/null +++ b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/migrations/SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter.scala @@ -0,0 +1,30 @@ +package pl.touk.nussknacker.defaultmodel.migrations + +import pl.touk.nussknacker.engine.api.MetaData +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.graph.node +import pl.touk.nussknacker.engine.graph.node.Source +import pl.touk.nussknacker.engine.graph.source.SourceRef +import pl.touk.nussknacker.engine.migration.NodeMigration + +object SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter extends NodeMigration { + + override val description: String = "Change name of component: sample-generator -> event-generator " + + "and its parameter: period -> schedule" + + override def migrateNode(metaData: MetaData): PartialFunction[node.NodeData, node.NodeData] = { + case source @ Source(_, ref @ SourceRef("sample-generator", _), _) => + source.copy(ref = + ref.copy( + typ = "event-generator", + parameters = ref.parameters.map { param => + param.name.value match { + case "period" => param.copy(name = ParameterName("schedule")) + case _ => param + } + } + ) + ) + } + +} diff --git a/defaultModel/src/test/scala/pl/touk/nussknacker/defaultModel/migrations/SampleGeneratorToEventGeneratorAndPeriodicToScheduleSpec.scala b/defaultModel/src/test/scala/pl/touk/nussknacker/defaultModel/migrations/SampleGeneratorToEventGeneratorAndPeriodicToScheduleSpec.scala new file mode 100644 index 00000000000..109f26da44a --- /dev/null +++ b/defaultModel/src/test/scala/pl/touk/nussknacker/defaultModel/migrations/SampleGeneratorToEventGeneratorAndPeriodicToScheduleSpec.scala @@ -0,0 +1,45 @@ +package pl.touk.nussknacker.defaultModel.migrations + +import org.scalatest.freespec.AnyFreeSpecLike +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.defaultmodel.migrations.SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData} +import pl.touk.nussknacker.engine.graph.evaluatedparam.Parameter +import pl.touk.nussknacker.engine.graph.node.Source +import pl.touk.nussknacker.engine.graph.source.SourceRef +import pl.touk.nussknacker.engine.spel.SpelExtension._ + +class SampleGeneratorToEventGeneratorAndPeriodicToScheduleSpec extends AnyFreeSpecLike with Matchers { + + "PeriodicToSampleGeneratorMigration should be applied" in { + val metaData = MetaData("test", StreamMetaData(Some(1))) + val beforeMigration = Source( + id = "sample-generator", + ref = SourceRef( + typ = "sample-generator", + parameters = List( + Parameter(ParameterName("period"), "T(java.time.Duration).parse('PT1M')".spel), + Parameter(ParameterName("count"), "1".spel), + Parameter(ParameterName("value"), "1".spel), + ) + ) + ) + val expectedAfterMigration = Source( + id = "sample-generator", + ref = SourceRef( + typ = "event-generator", + parameters = List( + Parameter(ParameterName("schedule"), "T(java.time.Duration).parse('PT1M')".spel), + Parameter(ParameterName("count"), "1".spel), + Parameter(ParameterName("value"), "1".spel), + ) + ) + ) + + val migrated = SampleGeneratorToEventGeneratorAndPeriodToScheduleParameter.migrateNode(metaData)(beforeMigration) + + migrated shouldBe expectedAfterMigration + } + +} diff --git a/designer/client/cypress/e2e/counts.cy.ts b/designer/client/cypress/e2e/counts.cy.ts index 33da16484f8..8c456a6b8e9 100644 --- a/designer/client/cypress/e2e/counts.cy.ts +++ b/designer/client/cypress/e2e/counts.cy.ts @@ -82,7 +82,7 @@ describe("Counts", () => { .click(); cy.get("[data-testid=window]").contains(/^ok$/i).click(); - cy.getNode("sample-generator") + cy.getNode("event-generator") .parent() .matchImage({ screenshotConfig: { padding: 16 } }); }); diff --git a/designer/client/cypress/e2e/nodeWindow.cy.ts b/designer/client/cypress/e2e/nodeWindow.cy.ts index c62d1f824ef..daa1a857439 100644 --- a/designer/client/cypress/e2e/nodeWindow.cy.ts +++ b/designer/client/cypress/e2e/nodeWindow.cy.ts @@ -13,13 +13,13 @@ describe("Node window", () => { cy.viewport(1600, 800); }); - it("should display sample-generator source", () => { + it("should display event-generator source", () => { cy.visitNewProcess(NAME).as("processName"); cy.contains(/^sources$/) .should("exist") .scrollIntoView(); cy.layoutScenario(); - cy.get("[data-testid='component:sample-generator']") + cy.get("[data-testid='component:event-generator']") .should("be.visible") .drag("#nk-graph-main", { target: { @@ -29,7 +29,7 @@ describe("Node window", () => { force: true, }); - cy.getNode("sample-generator").dblclick(); + cy.getNode("event-generator").dblclick(); // TODO: fix validation display in node windows cy.intercept("POST", "/api/nodes/*/validation").as("validation"); diff --git a/designer/client/cypress/fixtures/aggregations.json b/designer/client/cypress/fixtures/aggregations.json index a91cf8b8f04..808d33e05ed 100644 --- a/designer/client/cypress/fixtures/aggregations.json +++ b/designer/client/cypress/fixtures/aggregations.json @@ -15,12 +15,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/counts.json b/designer/client/cypress/fixtures/counts.json index 8c67ebf92d0..8f317f56fab 100644 --- a/designer/client/cypress/fixtures/counts.json +++ b/designer/client/cypress/fixtures/counts.json @@ -16,12 +16,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1S')" diff --git a/designer/client/cypress/fixtures/docsAggregatesFullOuterJoin#0.json b/designer/client/cypress/fixtures/docsAggregatesFullOuterJoin#0.json index 2a8897c81f2..a5ca191e2ca 100644 --- a/designer/client/cypress/fixtures/docsAggregatesFullOuterJoin#0.json +++ b/designer/client/cypress/fixtures/docsAggregatesFullOuterJoin#0.json @@ -18,10 +18,10 @@ { "id": "subscriber alerts", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" @@ -65,10 +65,10 @@ { "id": "audit - all events", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsAggregatesSingleSideJoin#0.json b/designer/client/cypress/fixtures/docsAggregatesSingleSideJoin#0.json index a438179417e..f0cc165b76d 100644 --- a/designer/client/cypress/fixtures/docsAggregatesSingleSideJoin#0.json +++ b/designer/client/cypress/fixtures/docsAggregatesSingleSideJoin#0.json @@ -114,10 +114,10 @@ { "id": "subscriber alerts", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" @@ -160,10 +160,10 @@ { "id": "audit - all events", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsChoice#0.json b/designer/client/cypress/fixtures/docsBasicComponentsChoice#0.json index cfdc0317069..4fb0629cbae 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsChoice#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsChoice#0.json @@ -18,10 +18,10 @@ { "id": "source", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsFilter#0.json b/designer/client/cypress/fixtures/docsBasicComponentsFilter#0.json index 74cd1120245..b94d9077f6d 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsFilter#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsFilter#0.json @@ -18,10 +18,10 @@ { "id": "source", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsFilter#1.json b/designer/client/cypress/fixtures/docsBasicComponentsFilter#1.json index e9f39d3f504..829909c6e84 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsFilter#1.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsFilter#1.json @@ -18,10 +18,10 @@ { "id": "source", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsForEach#0.json b/designer/client/cypress/fixtures/docsBasicComponentsForEach#0.json index a52abdb5055..887eaeb649d 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsForEach#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsForEach#0.json @@ -18,10 +18,10 @@ { "id": "source", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsRecordVariable#0.json b/designer/client/cypress/fixtures/docsBasicComponentsRecordVariable#0.json index 293e93ab7cc..16efff4ef1d 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsRecordVariable#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsRecordVariable#0.json @@ -16,12 +16,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsSplit#0.json b/designer/client/cypress/fixtures/docsBasicComponentsSplit#0.json index 38202615030..cbafcee61dd 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsSplit#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsSplit#0.json @@ -16,12 +16,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsUnion#0.json b/designer/client/cypress/fixtures/docsBasicComponentsUnion#0.json index 4fb2cf81803..50ce8f28b57 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsUnion#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsUnion#0.json @@ -18,10 +18,10 @@ { "id": "source", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsVariable#0.json b/designer/client/cypress/fixtures/docsBasicComponentsVariable#0.json index 0d0ded0ad8a..8e7eab6ca51 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsVariable#0.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsVariable#0.json @@ -18,10 +18,10 @@ { "id": "input Kafka topic with json content", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/docsBasicComponentsVariable#1.json b/designer/client/cypress/fixtures/docsBasicComponentsVariable#1.json index 8cc7a5d7ed6..3298f8ce1e7 100644 --- a/designer/client/cypress/fixtures/docsBasicComponentsVariable#1.json +++ b/designer/client/cypress/fixtures/docsBasicComponentsVariable#1.json @@ -16,12 +16,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('PT1M')" diff --git a/designer/client/cypress/fixtures/table.json b/designer/client/cypress/fixtures/table.json index dcf17fcbfb1..dbc9b4097d5 100644 --- a/designer/client/cypress/fixtures/table.json +++ b/designer/client/cypress/fixtures/table.json @@ -17,12 +17,12 @@ }, "nodes": [ { - "id": "sample-generator", + "id": "event-generator", "ref": { - "typ": "sample-generator", + "typ": "event-generator", "parameters": [ { - "name": "period", + "name": "schedule", "expression": { "language": "spel", "expression": "T(java.time.Duration).parse('P1D')" diff --git a/docs/Changelog.md b/docs/Changelog.md index c5d9df1d8ad..a2ff9915454 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -37,9 +37,10 @@ * [#7356](https://github.com/TouK/nussknacker/pull/7356) Integers converted to BigDecimals have scale 18, this fixes issue with unexpected low scale when performing division on BigDecimals which were created in such conversion. * [#7379](https://github.com/TouK/nussknacker/pull/7379) Removed CustomAction mechanism. -* Changes to `periodic` component (renamed to `sample-generator`): - * [#7368](https://github.com/TouK/nussknacker/pull/7368) Component rename: `periodic` to `sample-generator` - * [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `period` editor +* Changes to `periodic` component (renamed to `event-generator`): + * [#TODO](https://github.com/TouK/nussknacker/pull/TODO) Component rename: `periodic` to `event-generator` + * [#TODO](https://github.com/TouK/nussknacker/pull/TODO) Parameter rename: `period` to `schedule` + * [#7373](https://github.com/TouK/nussknacker/pull/7373) Improvements to `schedule` editor * [#7376](https://github.com/TouK/nussknacker/pull/7376) Previously, when count was > 1, the value was evaluated once and emitted times count. For example: if the value was evaluated to be a random UUID and count was 5, one UUID was generated and emitted 5 times. Now in one count batch each value is evaluated separately. diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index dd50634770e..473118c8811 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -79,7 +79,7 @@ To see the biggest differences please consult the [changelog](Changelog.md). `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioStateVerification` to `false` ### Code API changes -* [#7368](https://github.com/TouK/nussknacker/pull/7368) Renamed `PeriodicSourceFactory` to `SampleGeneratorSourceFactory` +* [#7368](https://github.com/TouK/nussknacker/pull/7368) [#TODO](https://github.com/TouK/nussknacker/pull/TODO) Renamed `PeriodicSourceFactory` to `EventGeneratorSourceFactory` * [#7364](https://github.com/TouK/nussknacker/pull/7364) The DeploymentManager must implement `def schedulingSupport: SchedulingSupport`. If support not added, then `NoSchedulingSupport` should be used. ## In version 1.18.0 diff --git a/docs/developers_guide/FlinkComponents.md b/docs/developers_guide/FlinkComponents.md index b9da22ee908..3437e86c1d3 100644 --- a/docs/developers_guide/FlinkComponents.md +++ b/docs/developers_guide/FlinkComponents.md @@ -41,7 +41,7 @@ Your Nussknacker source component specification should be a [SourceFactory](http returning your source implementation. ### Examples -- [Periodic source](../scenarios_authoring/DataSourcesAndSinks.md#sample-generator) and its [implementation](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala) +- [Periodic source](../scenarios_authoring/DataSourcesAndSinks.md#event-generator) and its [implementation](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala) - [FlinkKafkaSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala) and its factory returning the source implementation along with the fixed specification (e.g. based on a Scala case class) [KafkaSourceFactory](https://github.com/TouK/nussknacker/blob/staging/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala) or generic one [UniversalKafkaSourceFactory](https://github.com/TouK/nussknacker/blob/staging/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala) diff --git a/docs/scenarios_authoring/DataSourcesAndSinks.md b/docs/scenarios_authoring/DataSourcesAndSinks.md index d98544cb64d..7d61d09d067 100644 --- a/docs/scenarios_authoring/DataSourcesAndSinks.md +++ b/docs/scenarios_authoring/DataSourcesAndSinks.md @@ -96,8 +96,8 @@ should be used for that. ![Kafka source](img/kafkaSource.png "Kafka source") -  -### Kafka sink +_  +### Kafka sink_ The `kafka` sink configuration form will show a list of fields defined in Schema Registry for the given topic. The result of the expression entered in the `Key` field will be used as a partitioning key when sending the @@ -184,16 +184,16 @@ Holds event in the node until The `key` parameter will be removed in the future release of Nussknacker, for the time being, configure it to `#inputMeta.key`.   -### Sample generator +### Event generator **(Flink engine only)** -![sample_generator_window](img/sample-generator.png) +![event_generator_window](img/event-generator.png) This source provides functionality of sending a number of given events in a periodic way. It's mainly used for testing. This source has the following parameters: -- period - specifies how often events will be sent +- schedule - specifies how often events will be sent - count - specifies number of event that will be sent every `period` - value - specifies data that event will hold diff --git a/docs/scenarios_authoring/img/sample-generator.png b/docs/scenarios_authoring/img/event-generator.png similarity index 100% rename from docs/scenarios_authoring/img/sample-generator.png rename to docs/scenarios_authoring/img/event-generator.png diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala similarity index 87% rename from engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala rename to engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala index 2189dec4539..29cdc2e817a 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactorySpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder import pl.touk.nussknacker.test.PatientScalaFutures -class SampleGeneratorSourceFactorySpec +class EventGeneratorSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientScalaFutures @@ -34,11 +34,11 @@ class SampleGeneratorSourceFactorySpec val scenario = ScenarioBuilder .streaming("test") .source( - "sample-generator", - "sample-generator", - "period" -> "T(java.time.Duration).ofSeconds(1)".spel, - "count" -> "1".spel, - "value" -> s"'$input'".spel + "event-generator", + "event-generator", + "schedule" -> "T(java.time.Duration).ofSeconds(1)".spel, + "count" -> "1".spel, + "value" -> s"'$input'".spel ) .emptySink(sinkId, "dead-end") @@ -69,11 +69,11 @@ class SampleGeneratorSourceFactorySpec val scenario = ScenarioBuilder .streaming("test") .source( - "sample-generator", - "sample-generator", - "period" -> "T(java.time.Duration).ofSeconds(1)".spel, - "count" -> "2".spel, - "value" -> s"T(java.util.UUID).randomUUID".spel + "event-generator", + "event-generator", + "schedule" -> "T(java.time.Duration).ofSeconds(1)".spel, + "count" -> "2".spel, + "value" -> s"T(java.util.UUID).randomUUID".spel ) .emptySink(sinkId, "dead-end") diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/FlinkBaseUnboundedComponentProvider.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/FlinkBaseUnboundedComponentProvider.scala index 3603ef9464b..f38e702fe0f 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/FlinkBaseUnboundedComponentProvider.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/FlinkBaseUnboundedComponentProvider.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.sampleTransfo TumblingAggregateTransformer } import pl.touk.nussknacker.engine.flink.util.transformer.join.{FullOuterJoinTransformer, SingleSideJoinTransformer} -import pl.touk.nussknacker.engine.flink.util.transformer.{SampleGeneratorSourceFactory, UnionWithMemoTransformer} +import pl.touk.nussknacker.engine.flink.util.transformer.{EventGeneratorSourceFactory, UnionWithMemoTransformer} import pl.touk.nussknacker.engine.util.config.DocsConfig class FlinkBaseUnboundedComponentProvider extends ComponentProvider { @@ -39,8 +39,8 @@ object FlinkBaseUnboundedComponentProvider { // When adding/changing stateful components, corresponding changes should be done in LiteBaseComponentProvider! val statelessComponents = List( - ComponentDefinition("sample-generator", SampleGeneratorSourceFactory).withRelativeDocs( - "DataSourcesAndSinks#sample-generator" + ComponentDefinition("event-generator", EventGeneratorSourceFactory).withRelativeDocs( + "DataSourcesAndSinks#event-generator" ), ) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactory.scala similarity index 94% rename from engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala rename to engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactory.scala index 478df94e44e..220dd029c8c 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/SampleGeneratorSourceFactory.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactory.scala @@ -31,8 +31,8 @@ import javax.validation.constraints.Min import scala.jdk.CollectionConverters._ // TODO: add testing capabilities -object SampleGeneratorSourceFactory - extends SampleGeneratorSourceFactory( +object EventGeneratorSourceFactory + extends EventGeneratorSourceFactory( new StandardTimestampWatermarkHandler[AnyRef]( WatermarkStrategy .forMonotonousTimestamps() @@ -42,14 +42,14 @@ object SampleGeneratorSourceFactory ) ) -class SampleGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHandler[AnyRef]) +class EventGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHandler[AnyRef]) extends SourceFactory with UnboundedStreamComponent { @silent("deprecated") @MethodToInvoke def create( - @ParamName("period") + @ParamName("schedule") @DualEditor( simpleEditor = new SimpleEditor( `type` = SimpleEditorType.DURATION_EDITOR, @@ -57,7 +57,7 @@ class SampleGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHa ), defaultMode = DualEditorMode.SIMPLE ) - period: Duration, + schedule: Duration, // TODO: @DefaultValue(1) instead of nullable @ParamName("count") @Nullable @Min(1) nullableCount: Integer, @ParamName("value") value: LazyParameter[AnyRef] @@ -72,7 +72,7 @@ class SampleGeneratorSourceFactory(customTimestampAssigner: TimestampWatermarkHa // Parameter evaluation requires context, so here we create an empty context just to evaluate the `value` param. // Later the evaluated value is extracted from this temporary context and proper context is initialized. env - .addSource(new PeriodicFunction(period)) + .addSource(new PeriodicFunction(schedule)) .flatMap( (_: Unit, out: Collector[Context]) => { val temporaryContextForEvaluation = Context(flinkNodeContext.metaData.name.value)