Skip to content

Commit

Permalink
[NU-1934] Ability to create json kafka source without schema registry (
Browse files Browse the repository at this point in the history
…#7483)


Co-authored-by: Pawel Czajka <[email protected]>
  • Loading branch information
paw787878 and Pawel Czajka authored Jan 23, 2025
1 parent 4862d71 commit 14079a8
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params
* [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`.
* [#7481](https://github.com/TouK/nussknacker/pull/7481) Ignore jobs in CANCELLING status when checking for duplicate jobs on Flink
* [#7483](https://github.com/TouK/nussknacker/pull/7483) It's possible to configure kafka source to work without schema registry. To do that you should not provide property "schema.registry.url" in kafkaProperties config.

## 1.18

Expand Down
3 changes: 3 additions & 0 deletions docs/integration/KafkaIntegration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ Currently, Nussknacker supports two implementations of Schema Registries: based
To configure connection Schema Registry, you need to configure at least `schema.registry.url`. It should contain comma separated list of urls to Schema Registry.
For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with `https://` or `http://`.

It's possible to use kafka without schema registry, in this case You should not provide `schema.registry.url` property. Without
schema registry you can use only json kafka topics. Values read from it will be typed to `Unknown`.

Nussknacker determines which registry implementation (Confluent or Azure) is used from the `schema.registry.url` property.
If the URL ends with `.servicebus.windows.net`, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider
import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider}
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName.ToUnspecializedTopicName
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec}
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer.{
Expand Down Expand Up @@ -79,7 +79,7 @@ abstract class FlinkWithKafkaSuite
valueSerializer = new KafkaAvroSerializer(schemaRegistryMockClient)
valueDeserializer = new KafkaAvroDeserializer(schemaRegistryMockClient)
val components =
new MockFlinkKafkaComponentProvider(() => schemaRegistryClientProvider.schemaRegistryClientFactory)
createFinkKafkaComponentProvider(schemaRegistryClientProvider)
.create(kafkaComponentsConfig, ProcessObjectDependencies.withConfig(config)) :::
FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components :::
additionalComponents
Expand All @@ -92,6 +92,12 @@ abstract class FlinkWithKafkaSuite
)
}

protected def createFinkKafkaComponentProvider(
schemaRegistryClientProvider: MockSchemaRegistryClientProvider
): FlinkKafkaComponentProvider = {
new MockFlinkKafkaComponentProvider(() => schemaRegistryClientProvider.schemaRegistryClientFactory)
}

private def executionConfigPreparerChain(
modelData: LocalModelData,
schemaRegistryClientProvider: MockSchemaRegistryClientProvider
Expand All @@ -115,27 +121,31 @@ abstract class FlinkWithKafkaSuite

protected def avroAsJsonSerialization = false

override def kafkaComponentsConfig: Config = ConfigFactory
.empty()
.withValue(
KafkaConfigProperties.bootstrapServersProperty("config"),
fromAnyRef(kafkaServerWithDependencies.kafkaAddress)
)
.withValue(
KafkaConfigProperties.property("config", "schema.registry.url"),
fromAnyRef("not_used")
)
.withValue(
KafkaConfigProperties.property("config", "auto.offset.reset"),
fromAnyRef("earliest")
)
.withValue("config.avroAsJsonSerialization", fromAnyRef(avroAsJsonSerialization))
.withValue("config.topicsExistenceValidationConfig.enabled", fromAnyRef(false))
// we turn off auto registration to do it on our own passing mocked schema registry client
.withValue(
s"config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}",
fromAnyRef(false)
)
override def kafkaComponentsConfig: Config = {
val config = ConfigFactory
.empty()
.withValue(
KafkaConfigProperties.bootstrapServersProperty("config"),
fromAnyRef(kafkaServerWithDependencies.kafkaAddress)
)
.withValue(
KafkaConfigProperties.property("config", "auto.offset.reset"),
fromAnyRef("earliest")
)
.withValue("config.avroAsJsonSerialization", fromAnyRef(avroAsJsonSerialization))
.withValue("config.topicsExistenceValidationConfig.enabled", fromAnyRef(false))
// we turn off auto registration to do it on our own passing mocked schema registry client
.withValue(
s"config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}",
fromAnyRef(false)
)
maybeAddSchemaRegistryUrl(config)
}

protected def maybeAddSchemaRegistryUrl(config: Config): Config = config.withValue(
KafkaConfigProperties.property("config", "schema.registry.url"),
fromAnyRef("not_used")
)

lazy val kafkaConfig: KafkaConfig = KafkaConfig.parseConfig(config, "config")
protected val avroEncoder: ToAvroSchemaBasedEncoder = ToAvroSchemaBasedEncoder(ValidationMode.strict)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.defaultmodel
package pl.touk.nussknacker.defaultmodel.kafkaschemaless

import io.circe.{Json, parser}
import pl.touk.nussknacker.defaultmodel.FlinkWithKafkaSuite
import pl.touk.nussknacker.engine.api.process.TopicName.ForSource
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
Expand All @@ -13,15 +14,15 @@ import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion
import java.nio.charset.StandardCharsets
import java.time.Instant

class KafkaJsonItSpec extends FlinkWithKafkaSuite {
abstract class BaseKafkaJsonSchemalessItSpec extends FlinkWithKafkaSuite {

private val jsonRecord = Json.obj(
"first" -> Json.fromString("Jan"),
"middle" -> Json.fromString("Tomek"),
"last" -> Json.fromString("Kowalski")
)

test("should round-trip json message without provided schema") {
def shouldRoundTripJsonMessageWithoutProvidedSchema(): Unit = {

val inputTopic = "input-topic-without-schema-json"
val outputTopic = "output-topic-without-schema-json"
Expand Down Expand Up @@ -61,7 +62,7 @@ class KafkaJsonItSpec extends FlinkWithKafkaSuite {
}
}

ignore("should round-trip plain message without provided schema") {
def shouldRoundTripPlainMessageWithoutProvidedSchema(): Unit = {
val inputTopic = "input-topic-without-schema-plain"
val outputTopic = "output-topic-without-schema-plain"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.touk.nussknacker.defaultmodel.kafkaschemaless

import com.typesafe.config.Config
import pl.touk.nussknacker.defaultmodel.MockSchemaRegistryClientHolder.MockSchemaRegistryClientProvider
import pl.touk.nussknacker.engine.flink.util.transformer.FlinkKafkaComponentProvider

class KafkaJsonSchemalessNoSchemaRegistryItSpec extends BaseKafkaJsonSchemalessItSpec {

override def createFinkKafkaComponentProvider(schemaRegistryClientProvider: MockSchemaRegistryClientProvider) =
new FlinkKafkaComponentProvider()

override protected def maybeAddSchemaRegistryUrl(config: Config): Config = config

test("should round-trip json message without schema registry") {
shouldRoundTripJsonMessageWithoutProvidedSchema()
}

ignore("should round-trip plain message without schema registry") {
shouldRoundTripPlainMessageWithoutProvidedSchema()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pl.touk.nussknacker.defaultmodel.kafkaschemaless

class KafkaJsonSchemalessTopicNotInSchemaRegistryItSpec extends BaseKafkaJsonSchemalessItSpec {

test("should round-trip json message when topic is not in schema registry") {
shouldRoundTripJsonMessageWithoutProvidedSchema()
}

ignore("should round-trip plain message when topic is not in schema registry") {
shouldRoundTripPlainMessageWithoutProvidedSchema()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,33 @@ trait SchemaRegistryClient extends Serializable {

}

object EmptySchemaRegistry extends SchemaRegistryClient {

private val errorMessage = "There is no schema in empty schema registry";
private val error = SchemaError(errorMessage)

override def getSchemaById(id: SchemaId): SchemaWithMetadata = throw new IllegalStateException(errorMessage)

override protected def getByTopicAndVersion(
topic: UnspecializedTopicName,
version: Int,
isKey: Boolean
): Validated[SchemaRegistryError, SchemaWithMetadata] = Validated.Invalid(error)

override protected def getLatestFreshSchema(
topic: UnspecializedTopicName,
isKey: Boolean
): Validated[SchemaRegistryError, SchemaWithMetadata] = Validated.Invalid(error)

override def getAllTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = Validated.Valid(List())

override def getAllVersions(
topic: UnspecializedTopicName,
isKey: Boolean
): Validated[SchemaRegistryError, List[Integer]] = Validated.Invalid(error)

}

// This trait is mainly for testing mechanism purpose - in production implementation we assume that all schemas
// are registered before usage of client. We don't want to merge both traits because it can be hard to
// manage caching when both writing and reading operation will be available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import pl.touk.nussknacker.engine.api.process.TopicName
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClient
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{EmptySchemaRegistry, SchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.{AvroMessageFormatter, AvroMessageReader}
import pl.touk.nussknacker.engine.util.Implicits._
import pl.touk.nussknacker.engine.util.json.ToJsonEncoder
Expand All @@ -14,9 +14,16 @@ import java.nio.charset.StandardCharsets

class RecordFormatterSupportDispatcher(kafkaConfig: KafkaConfig, schemaRegistryClient: SchemaRegistryClient) {

private val supportBySchemaType =
UniversalSchemaSupportDispatcher(kafkaConfig).supportBySchemaType
private val supportBySchemaType = {
val supportBySchemaType = UniversalSchemaSupportDispatcher(kafkaConfig).supportBySchemaType
(
// To format avro messages you need schema registry, so for EmptySchemaRegistry there is no need to construct avro formatter
if (schemaRegistryClient == EmptySchemaRegistry)
supportBySchemaType.filterKeysNow(e => e != AvroSchema.TYPE)
else supportBySchemaType
)
.mapValuesNow(_.recordFormatterSupport(schemaRegistryClient))
}

def forSchemaType(schemaType: String): RecordFormatterSupport =
supportBySchemaType.getOrElse(schemaType, throw new UnsupportedSchemaType(schemaType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal
import pl.touk.nussknacker.engine.kafka.{KafkaUtils, SchemaRegistryClientKafkaConfig}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.AzureSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.CachedConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
EmptySchemaRegistry,
SchemaRegistryClient,
SchemaRegistryClientFactory
}

object UniversalSchemaRegistryClientFactory extends UniversalSchemaRegistryClientFactory

Expand All @@ -12,10 +16,14 @@ class UniversalSchemaRegistryClientFactory extends SchemaRegistryClientFactory {
override type SchemaRegistryClientT = SchemaRegistryClient

override def create(config: SchemaRegistryClientKafkaConfig): SchemaRegistryClientT = {
if (config.kafkaProperties.get("schema.registry.url").exists(_.endsWith(KafkaUtils.azureEventHubsUrl))) {
AzureSchemaRegistryClientFactory.create(config)
} else {
CachedConfluentSchemaRegistryClientFactory.create(config)
config.kafkaProperties.get("schema.registry.url") match {
case None => EmptySchemaRegistry
case Some(url) =>
if (url.endsWith(KafkaUtils.azureEventHubsUrl)) {
AzureSchemaRegistryClientFactory.create(config)
} else {
CachedConfluentSchemaRegistryClientFactory.create(config)
}
}
}

Expand Down

0 comments on commit 14079a8

Please sign in to comment.