From 14079a889af0ca2ef7feb7cedb4fde4c07177606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Czajka?= Date: Thu, 23 Jan 2025 15:52:04 +0100 Subject: [PATCH 1/3] [NU-1934] Ability to create json kafka source without schema registry (#7483) Co-authored-by: Pawel Czajka --- docs/Changelog.md | 1 + docs/integration/KafkaIntegration.md | 3 + .../defaultmodel/FlinkWithKafkaSuite.scala | 56 +++++++++++-------- .../BaseKafkaJsonSchemalessItSpec.scala} | 9 +-- ...JsonSchemalessNoSchemaRegistryItSpec.scala | 22 ++++++++ ...malessTopicNotInSchemaRegistryItSpec.scala | 13 +++++ .../schemaregistry/SchemaRegistryClient.scala | 27 +++++++++ .../universal/RecordFormatterSupport.scala | 13 ++++- ...UniversalSchemaRegistryClientFactory.scala | 18 ++++-- 9 files changed, 127 insertions(+), 35 deletions(-) rename engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/{KafkaJsonItSpec.scala => kafkaschemaless/BaseKafkaJsonSchemalessItSpec.scala} (92%) create mode 100644 engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessNoSchemaRegistryItSpec.scala create mode 100644 engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec.scala diff --git a/docs/Changelog.md b/docs/Changelog.md index be27b7cb6af..1803b14aeda 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -62,6 +62,7 @@ * [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params * [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`. * [#7481](https://github.com/TouK/nussknacker/pull/7481) Ignore jobs in CANCELLING status when checking for duplicate jobs on Flink +* [#7483](https://github.com/TouK/nussknacker/pull/7483) It's possible to configure kafka source to work without schema registry. To do that you should not provide property "schema.registry.url" in kafkaProperties config. ## 1.18 diff --git a/docs/integration/KafkaIntegration.md b/docs/integration/KafkaIntegration.md index fc6494a7ba4..fdfad274b03 100644 --- a/docs/integration/KafkaIntegration.md +++ b/docs/integration/KafkaIntegration.md @@ -94,6 +94,9 @@ Currently, Nussknacker supports two implementations of Schema Registries: based To configure connection Schema Registry, you need to configure at least `schema.registry.url`. It should contain comma separated list of urls to Schema Registry. For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with `https://` or `http://`. +It's possible to use kafka without schema registry, in this case You should not provide `schema.registry.url` property. Without +schema registry you can use only json kafka topics. Values read from it will be typed to `Unknown`. + Nussknacker determines which registry implementation (Confluent or Azure) is used from the `schema.registry.url` property. If the URL ends with `.servicebus.windows.net`, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed. diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index 248947348fe..36f865bb16b 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -22,7 +22,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.test.FlinkSpec -import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider +import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider} import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName.ToUnspecializedTopicName import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec} import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer.{ @@ -79,7 +79,7 @@ abstract class FlinkWithKafkaSuite valueSerializer = new KafkaAvroSerializer(schemaRegistryMockClient) valueDeserializer = new KafkaAvroDeserializer(schemaRegistryMockClient) val components = - new MockFlinkKafkaComponentProvider(() => schemaRegistryClientProvider.schemaRegistryClientFactory) + createFinkKafkaComponentProvider(schemaRegistryClientProvider) .create(kafkaComponentsConfig, ProcessObjectDependencies.withConfig(config)) ::: FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components ::: additionalComponents @@ -92,6 +92,12 @@ abstract class FlinkWithKafkaSuite ) } + protected def createFinkKafkaComponentProvider( + schemaRegistryClientProvider: MockSchemaRegistryClientProvider + ): FlinkKafkaComponentProvider = { + new MockFlinkKafkaComponentProvider(() => schemaRegistryClientProvider.schemaRegistryClientFactory) + } + private def executionConfigPreparerChain( modelData: LocalModelData, schemaRegistryClientProvider: MockSchemaRegistryClientProvider @@ -115,27 +121,31 @@ abstract class FlinkWithKafkaSuite protected def avroAsJsonSerialization = false - override def kafkaComponentsConfig: Config = ConfigFactory - .empty() - .withValue( - KafkaConfigProperties.bootstrapServersProperty("config"), - fromAnyRef(kafkaServerWithDependencies.kafkaAddress) - ) - .withValue( - KafkaConfigProperties.property("config", "schema.registry.url"), - fromAnyRef("not_used") - ) - .withValue( - KafkaConfigProperties.property("config", "auto.offset.reset"), - fromAnyRef("earliest") - ) - .withValue("config.avroAsJsonSerialization", fromAnyRef(avroAsJsonSerialization)) - .withValue("config.topicsExistenceValidationConfig.enabled", fromAnyRef(false)) - // we turn off auto registration to do it on our own passing mocked schema registry client - .withValue( - s"config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}", - fromAnyRef(false) - ) + override def kafkaComponentsConfig: Config = { + val config = ConfigFactory + .empty() + .withValue( + KafkaConfigProperties.bootstrapServersProperty("config"), + fromAnyRef(kafkaServerWithDependencies.kafkaAddress) + ) + .withValue( + KafkaConfigProperties.property("config", "auto.offset.reset"), + fromAnyRef("earliest") + ) + .withValue("config.avroAsJsonSerialization", fromAnyRef(avroAsJsonSerialization)) + .withValue("config.topicsExistenceValidationConfig.enabled", fromAnyRef(false)) + // we turn off auto registration to do it on our own passing mocked schema registry client + .withValue( + s"config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}", + fromAnyRef(false) + ) + maybeAddSchemaRegistryUrl(config) + } + + protected def maybeAddSchemaRegistryUrl(config: Config): Config = config.withValue( + KafkaConfigProperties.property("config", "schema.registry.url"), + fromAnyRef("not_used") + ) lazy val kafkaConfig: KafkaConfig = KafkaConfig.parseConfig(config, "config") protected val avroEncoder: ToAvroSchemaBasedEncoder = ToAvroSchemaBasedEncoder(ValidationMode.strict) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/BaseKafkaJsonSchemalessItSpec.scala similarity index 92% rename from engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala rename to engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/BaseKafkaJsonSchemalessItSpec.scala index 49e6cd7acae..a66cf68391f 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/BaseKafkaJsonSchemalessItSpec.scala @@ -1,6 +1,7 @@ -package pl.touk.nussknacker.defaultmodel +package pl.touk.nussknacker.defaultmodel.kafkaschemaless import io.circe.{Json, parser} +import pl.touk.nussknacker.defaultmodel.FlinkWithKafkaSuite import pl.touk.nussknacker.engine.api.process.TopicName.ForSource import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.build.ScenarioBuilder @@ -13,7 +14,7 @@ import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion import java.nio.charset.StandardCharsets import java.time.Instant -class KafkaJsonItSpec extends FlinkWithKafkaSuite { +abstract class BaseKafkaJsonSchemalessItSpec extends FlinkWithKafkaSuite { private val jsonRecord = Json.obj( "first" -> Json.fromString("Jan"), @@ -21,7 +22,7 @@ class KafkaJsonItSpec extends FlinkWithKafkaSuite { "last" -> Json.fromString("Kowalski") ) - test("should round-trip json message without provided schema") { + def shouldRoundTripJsonMessageWithoutProvidedSchema(): Unit = { val inputTopic = "input-topic-without-schema-json" val outputTopic = "output-topic-without-schema-json" @@ -61,7 +62,7 @@ class KafkaJsonItSpec extends FlinkWithKafkaSuite { } } - ignore("should round-trip plain message without provided schema") { + def shouldRoundTripPlainMessageWithoutProvidedSchema(): Unit = { val inputTopic = "input-topic-without-schema-plain" val outputTopic = "output-topic-without-schema-plain" diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessNoSchemaRegistryItSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessNoSchemaRegistryItSpec.scala new file mode 100644 index 00000000000..357f0710ae4 --- /dev/null +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessNoSchemaRegistryItSpec.scala @@ -0,0 +1,22 @@ +package pl.touk.nussknacker.defaultmodel.kafkaschemaless + +import com.typesafe.config.Config +import pl.touk.nussknacker.defaultmodel.MockSchemaRegistryClientHolder.MockSchemaRegistryClientProvider +import pl.touk.nussknacker.engine.flink.util.transformer.FlinkKafkaComponentProvider + +class KafkaJsonSchemalessNoSchemaRegistryItSpec extends BaseKafkaJsonSchemalessItSpec { + + override def createFinkKafkaComponentProvider(schemaRegistryClientProvider: MockSchemaRegistryClientProvider) = + new FlinkKafkaComponentProvider() + + override protected def maybeAddSchemaRegistryUrl(config: Config): Config = config + + test("should round-trip json message without schema registry") { + shouldRoundTripJsonMessageWithoutProvidedSchema() + } + + ignore("should round-trip plain message without schema registry") { + shouldRoundTripPlainMessageWithoutProvidedSchema() + } + +} diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec.scala new file mode 100644 index 00000000000..f5e75c36617 --- /dev/null +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/kafkaschemaless/KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec.scala @@ -0,0 +1,13 @@ +package pl.touk.nussknacker.defaultmodel.kafkaschemaless + +class KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec extends BaseKafkaJsonSchemalessItSpec { + + test("should round-trip json message when topic is not in schema registry") { + shouldRoundTripJsonMessageWithoutProvidedSchema() + } + + ignore("should round-trip plain message when topic is not in schema registry") { + shouldRoundTripPlainMessageWithoutProvidedSchema() + } + +} diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala index 8319b0c021f..be072aeb43f 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala @@ -51,6 +51,33 @@ trait SchemaRegistryClient extends Serializable { } +object EmptySchemaRegistry extends SchemaRegistryClient { + + private val errorMessage = "There is no schema in empty schema registry"; + private val error = SchemaError(errorMessage) + + override def getSchemaById(id: SchemaId): SchemaWithMetadata = throw new IllegalStateException(errorMessage) + + override protected def getByTopicAndVersion( + topic: UnspecializedTopicName, + version: Int, + isKey: Boolean + ): Validated[SchemaRegistryError, SchemaWithMetadata] = Validated.Invalid(error) + + override protected def getLatestFreshSchema( + topic: UnspecializedTopicName, + isKey: Boolean + ): Validated[SchemaRegistryError, SchemaWithMetadata] = Validated.Invalid(error) + + override def getAllTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = Validated.Valid(List()) + + override def getAllVersions( + topic: UnspecializedTopicName, + isKey: Boolean + ): Validated[SchemaRegistryError, List[Integer]] = Validated.Invalid(error) + +} + // This trait is mainly for testing mechanism purpose - in production implementation we assume that all schemas // are registered before usage of client. We don't want to merge both traits because it can be hard to // manage caching when both writing and reading operation will be available diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala index 8589b63648b..0867e683760 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala @@ -5,7 +5,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import io.confluent.kafka.schemaregistry.avro.AvroSchema import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.kafka.KafkaConfig -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClient +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{EmptySchemaRegistry, SchemaRegistryClient} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.{AvroMessageFormatter, AvroMessageReader} import pl.touk.nussknacker.engine.util.Implicits._ import pl.touk.nussknacker.engine.util.json.ToJsonEncoder @@ -14,9 +14,16 @@ import java.nio.charset.StandardCharsets class RecordFormatterSupportDispatcher(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient) { - private val supportBySchemaType = - UniversalSchemaSupportDispatcher(kafkaConfig).supportBySchemaType + private val supportBySchemaType = { + val supportBySchemaType = UniversalSchemaSupportDispatcher(kafkaConfig).supportBySchemaType + ( + // To format avro messages you need schema registry, so for EmptySchemaRegistry there is no need to construct avro formatter + if (schemaRegistryClient == EmptySchemaRegistry) + supportBySchemaType.filterKeysNow(e => e != AvroSchema.TYPE) + else supportBySchemaType + ) .mapValuesNow(_.recordFormatterSupport(schemaRegistryClient)) + } def forSchemaType(schemaType: String): RecordFormatterSupport = supportBySchemaType.getOrElse(schemaType, throw new UnsupportedSchemaType(schemaType)) diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaRegistryClientFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaRegistryClientFactory.scala index 72a649b1358..e534296f7d5 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaRegistryClientFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaRegistryClientFactory.scala @@ -3,7 +3,11 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal import pl.touk.nussknacker.engine.kafka.{KafkaUtils, SchemaRegistryClientKafkaConfig} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.AzureSchemaRegistryClientFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.CachedConfluentSchemaRegistryClientFactory -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ + EmptySchemaRegistry, + SchemaRegistryClient, + SchemaRegistryClientFactory +} object UniversalSchemaRegistryClientFactory extends UniversalSchemaRegistryClientFactory @@ -12,10 +16,14 @@ class UniversalSchemaRegistryClientFactory extends SchemaRegistryClientFactory { override type SchemaRegistryClientT = SchemaRegistryClient override def create(config: SchemaRegistryClientKafkaConfig): SchemaRegistryClientT = { - if (config.kafkaProperties.get("schema.registry.url").exists(_.endsWith(KafkaUtils.azureEventHubsUrl))) { - AzureSchemaRegistryClientFactory.create(config) - } else { - CachedConfluentSchemaRegistryClientFactory.create(config) + config.kafkaProperties.get("schema.registry.url") match { + case None => EmptySchemaRegistry + case Some(url) => + if (url.endsWith(KafkaUtils.azureEventHubsUrl)) { + AzureSchemaRegistryClientFactory.create(config) + } else { + CachedConfluentSchemaRegistryClientFactory.create(config) + } } } From 16d71c1c53f28c773279a5d6568fc9f3d5a8c385 Mon Sep 17 00:00:00 2001 From: Filip Michalski Date: Thu, 23 Jan 2025 15:52:51 +0100 Subject: [PATCH 2/3] Catch errors in Sticky Notes SVG parsing, add closing tag to
(#7494) --- .../components/graph/EspNode/stickyNote.ts | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/designer/client/src/components/graph/EspNode/stickyNote.ts b/designer/client/src/components/graph/EspNode/stickyNote.ts index afcfc78f0ac..79be1017419 100644 --- a/designer/client/src/components/graph/EspNode/stickyNote.ts +++ b/designer/client/src/components/graph/EspNode/stickyNote.ts @@ -56,28 +56,39 @@ const renderer = new marked.Renderer(); renderer.link = function (href, title, text) { return `${text}`; }; + +renderer.hr = function () { + return `----`; // SVG doesn't support HTML hr inside foreignObject +}; + renderer.image = function (href, title, text) { - // SVG don't support HTML img inside foreignObject + // SVG doesn't support HTML img inside foreignObject return `${text} (attached img)`; }; -const foreignObject = (stickyNote: StickyNote): MarkupNodeJSON => { - let parsed; - try { - parsed = DOMPurify.sanitize(marked.parse(stickyNote.content, { renderer }), { ADD_ATTR: ["target"] }); - } catch (error) { - console.error("Failed to parse markdown:", error); - parsed = "Error: Could not parse markdown content. See error logs in console"; - } - const singleMarkupNode = util.svg/* xml */ ` +const prepareSvgObject = (content: string) => + util.svg/* xml */ `
-
${parsed}
+
${content}
- `[0]; - return singleMarkupNode as MarkupNodeJSON; + `[0] as MarkupNodeJSON; + +const escapeHtmlContent = (content: string) => + content.replace(/&/g, "&").replace(//g, ">").replace(/"/g, """).replace(/'/g, "'"); + +const foreignObject = (stickyNote: StickyNote): MarkupNodeJSON => { + try { + const contentWithHtmlTagsSanitized = escapeHtmlContent(stickyNote.content); + let parsed = DOMPurify.sanitize(marked.parse(contentWithHtmlTagsSanitized, { renderer }), { ADD_ATTR: ["target"] }); + parsed = parsed.replace(//g, "
"); // SVG does not allow tag without closing and DOMPurify always remove closing tag. + return prepareSvgObject(parsed); + } catch (error) { + console.error("Error: Could not parse markdown:", error); + return prepareSvgObject("[!] Could not parse markdown content [!]\n

" + escapeHtmlContent(stickyNote.content)); + } }; export const stickyNotePath = "M 0 0 L 19 0 L 19 19 L 0 19 L 0 0"; From 2beedae64cf3b805d47a2939ba85d3218ce75130 Mon Sep 17 00:00:00 2001 From: mgoworko <37329559+mgoworko@users.noreply.github.com> Date: Fri, 24 Jan 2025 15:02:19 +0100 Subject: [PATCH 3/3] [Fix] Periodic processes repository - process input config fix #7495 --- .../ui/process/periodic/PeriodicProcessService.scala | 5 +---- .../legacy/db/LegacyPeriodicProcessesRepository.scala | 4 ++-- .../process/repository/PeriodicProcessesRepository.scala | 7 +++---- .../flink/db/InMemPeriodicProcessesRepository.scala | 3 +-- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala index aa643e72250..669dbb6e832 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala @@ -505,10 +505,7 @@ class PeriodicProcessService( ) } inputConfigDuringExecutionJsonOpt <- periodicProcessesRepository - .fetchInputConfigDuringExecutionJson( - processName, - versionId, - ) + .fetchInputConfigDuringExecutionJson(deployment.periodicProcess.id) .run inputConfigDuringExecutionJson = inputConfigDuringExecutionJsonOpt.getOrElse { throw new PeriodicProcessException( diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala index e7e9f8a7443..a74e5e29ce3 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala @@ -418,9 +418,9 @@ class SlickLegacyPeriodicProcessesRepository( ): Future[Option[(CanonicalProcess, ProcessVersion)]] = fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance) - def fetchInputConfigDuringExecutionJson(processName: ProcessName, versionId: VersionId): Action[Option[String]] = + def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] = PeriodicProcessesWithJson - .filter(p => p.processName === processName && p.processVersionId === versionId) + .filter(p => p.id === periodicProcessId) .map(_.inputConfigDuringExecutionJson) .result .headOption diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala index adbc64164ac..bd653a5a53a 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala @@ -157,8 +157,7 @@ trait PeriodicProcessesRepository { ): Action[PeriodicProcessDeployment] def fetchInputConfigDuringExecutionJson( - processName: ProcessName, - versionId: VersionId + periodicProcessId: PeriodicProcessId, ): Action[Option[String]] def fetchCanonicalProcessWithVersion( @@ -496,9 +495,9 @@ class SlickPeriodicProcessesRepository( update.map(_ => ()) } - def fetchInputConfigDuringExecutionJson(processName: ProcessName, versionId: VersionId): Action[Option[String]] = + def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] = PeriodicProcessesWithInputConfig - .filter(p => p.processName === processName && p.processVersionId === versionId) + .filter(p => p.id === periodicProcessId) .map(_.inputConfigDuringExecutionJson) .result .headOption diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala index 16a14532615..fff0dba9f50 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala @@ -354,8 +354,7 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP } override def fetchInputConfigDuringExecutionJson( - processName: ProcessName, - versionId: VersionId + periodicProcessId: PeriodicProcessId, ): Future[Option[String]] = Future.successful(Some("{}"))