Skip to content

Commit

Permalink
Correct timestamps in test from file for Kafka sources
Browse files Browse the repository at this point in the history
  • Loading branch information
mproch authored and arkadius committed Sep 1, 2021
1 parent a67aae3 commit 2deab65
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 69 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ lazy val flinkTestUtil = (project in engine("flink/test-util")).
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV
)
}
).dependsOn(testUtil, queryableState, flinkUtil)
).dependsOn(testUtil, queryableState, flinkUtil, interpreter)

lazy val standaloneUtil = (project in engine("standalone/util")).
settings(commonSettings).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package pl.touk.nussknacker.engine.flink.api.timestampwatermark

import java.time.Duration

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.eventtime
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, TimestampAssigner}
import org.apache.flink.streaming.api.scala.DataStream

Expand All @@ -29,14 +29,29 @@ class StandardTimestampWatermarkHandler[T](strategy: WatermarkStrategy[T]) exten

object StandardTimestampWatermarkHandler {

//cannot use scala function as lambda, as it is not serializable...
def timestampAssigner[T](extract: T => Long): SerializableTimestampAssigner[T] = new SerializableTimestampAssigner[T] {
override def extractTimestamp(element: T, recordTimestamp: Long): Long = extract(element)
}

//cannot use scala function as lambda, as it is not serializable...
def timestampAssignerWithTimestamp[T](extract: (T, Long) => Long): SerializableTimestampAssigner[T] = new SerializableTimestampAssigner[T] {
override def extractTimestamp(element: T, recordTimestamp: Long): Long = extract(element, recordTimestamp)
}

def boundedOutOfOrderness[T](extract: T => Long, maxOutOfOrderness: Duration): TimestampWatermarkHandler[T] = {
new StandardTimestampWatermarkHandler(WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withTimestampAssigner(timestampAssigner(extract)))
}

def afterEachEvent[T](extract: T => Long): TimestampWatermarkHandler[T] = {
new StandardTimestampWatermarkHandler[T](WatermarkStrategy.forGenerator((_: WatermarkGeneratorSupplier.Context) => new WatermarkGenerator[T] {
override def onEvent(event: T, eventTimestamp: Long, output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(eventTimestamp))
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}).withTimestampAssigner(timestampAssigner(extract)))
}

}

@silent("deprecated")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, KafkaAvroBaseTransformer, RuntimeSchemaData}
import pl.touk.nussknacker.engine.flink.api.process.{FlinkContextInitializer, FlinkSource, FlinkSourceFactory}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
import pl.touk.nussknacker.engine.kafka.source.{KafkaContextInitializer, KafkaSource}
import pl.touk.nussknacker.engine.kafka.source.{ConsumerRecordBasedKafkaSource, KafkaContextInitializer, KafkaSource}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic, RecordFormatter}

import scala.reflect.ClassTag
Expand Down Expand Up @@ -136,11 +136,8 @@ class KafkaAvroSourceFactory[K:ClassTag, V:ClassTag](val schemaRegistryProvider:
deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]],
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]],
formatter: RecordFormatter,
flinkContextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]]): KafkaSource[ConsumerRecord[K, V]] = {
new KafkaSource[ConsumerRecord[K, V]](preparedTopics, kafkaConfig, deserializationSchema, timestampAssigner, formatter) {
override val contextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]] = flinkContextInitializer
}
}
flinkContextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]]): KafkaSource[ConsumerRecord[K, V]] =
new ConsumerRecordBasedKafkaSource[K, V](preparedTopics, kafkaConfig, deserializationSchema, timestampAssigner, formatter, flinkContextInitializer)

override def nodeDependencies: List[NodeDependency] = List(TypedNodeDependency(classOf[MetaData]),
TypedNodeDependency(classOf[NodeId]), OutputVariableNameDependency)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class KafkaAvroIntegrationSpec extends KafkaAvroSpecMixin with BeforeAndAfter {
.id("avro-test-timestamp-flink-kafka").parallelism(1).exceptionHandler()
.source(
"start", "kafka-avro", TopicParamName -> s"'${topicConfig.input}'", SchemaVersionParamName -> s"'${SchemaVersionOption.LatestOptionName}'"
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestmp",
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestamp",
"timestampToSet" -> (timeToSetInProcess.toString + "L"))
.emptySink(
"end",
Expand Down Expand Up @@ -292,7 +292,7 @@ class KafkaAvroIntegrationSpec extends KafkaAvroSpecMixin with BeforeAndAfter {
.id("avro-test-timestamp-kafka-flink").parallelism(1).exceptionHandler()
.source(
"start", "kafka-avro", TopicParamName -> s"'${topicConfig.input}'", SchemaVersionParamName -> s"'${SchemaVersionOption.LatestOptionName}'"
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestmp",
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestamp",
"timestampToSet" -> "10000")
.emptySink(
"end",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomStreamTransformat
import pl.touk.nussknacker.engine.flink.test.RecordingExceptionHandler
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.process.helpers.SampleNodes.ExtractAndTransformTimestamp
import pl.touk.nussknacker.engine.process.helpers.SinkForType
import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator

Expand Down Expand Up @@ -53,7 +54,7 @@ class KafkaAvroTestProcessConfigCreator extends EmptyProcessConfigCreator {
}

override def customStreamTransformers(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[CustomStreamTransformer]] = {
Map("extractAndTransformTimestmp" -> defaultCategory(ExtractAndTransformTimestamp))
Map("extractAndTransformTimestamp" -> defaultCategory(ExtractAndTransformTimestamp))
}

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
Expand All @@ -73,17 +74,4 @@ class KafkaAvroTestProcessConfigCreator extends EmptyProcessConfigCreator {

}

object ExtractAndTransformTimestamp extends CustomStreamTransformer {

@MethodToInvoke(returnType = classOf[Long])
def methodToInvoke(@ParamName("timestampToSet") timestampToSet: Long): FlinkCustomStreamTransformation
= FlinkCustomStreamTransformation(_.transform("collectTimestamp",
new AbstractStreamOperator[ValueWithContext[AnyRef]] with OneInputStreamOperator[Context, ValueWithContext[AnyRef]] {
override def processElement(element: StreamRecord[Context]): Unit = {
output.collect(new StreamRecord[ValueWithContext[AnyRef]](ValueWithContext(element.getTimestamp.underlying(), element.getValue), timestampToSet))
}
}))

}

case object SinkForInputMeta extends SinkForType[InputMeta[Any]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package pl.touk.nussknacker.engine.avro.source

import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import com.typesafe.scalalogging.LazyLogging
import io.circe.Json._
import org.apache.kafka.common.record.TimestampType
import org.scalatest.{EitherValues, FunSuite, Matchers}
import pl.touk.nussknacker.engine.api.deployment.TestProcess._
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName}
import pl.touk.nussknacker.engine.avro.KafkaAvroIntegrationMockSchemaRegistry.schemaRegistryMockClient
import pl.touk.nussknacker.engine.avro.KafkaAvroTestProcessConfigCreator
import pl.touk.nussknacker.engine.avro.schema.Address
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.MockConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.{ConfluentSchemaRegistryProvider, ConfluentUtils}
import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaRegistryProvider, SchemaVersionOption}
import pl.touk.nussknacker.engine.build.EspProcessBuilder
import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration
import pl.touk.nussknacker.engine.graph.EspProcess
import pl.touk.nussknacker.engine.kafka.source.{InputMeta, InputMetaToJson}
import pl.touk.nussknacker.engine.process.ProcessToString.marshall
import pl.touk.nussknacker.engine.process.runner.FlinkTestMain
import pl.touk.nussknacker.engine.spel.Implicits._
import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder

import java.util.Collections

class TestFromFileSpec extends FunSuite with Matchers with LazyLogging {

private lazy val creator: KafkaAvroTestProcessConfigCreator = new KafkaAvroTestProcessConfigCreator {
override protected def createSchemaRegistryProvider: SchemaRegistryProvider =
ConfluentSchemaRegistryProvider(new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient))
}

private lazy val config = ConfigFactory.empty()
.withValue("kafka.kafkaAddress", fromAnyRef("notused:1111"))
.withValue("kafka.kafkaProperties.\"schema.registry.url\"", fromAnyRef("notused:2222"))

test("Should pass correct timestamp from test data") {

val topic = "simple"
val expectedTimestamp = System.currentTimeMillis()
val inputMeta = InputMeta(null, topic, 0, 1, expectedTimestamp, TimestampType.CREATE_TIME, Collections.emptyMap(), 0)
val id: Int = registerSchema(topic)

val process = EspProcessBuilder.id("test").exceptionHandler()
.source(
"start", "kafka-avro", TopicParamName -> s"'$topic'", SchemaVersionParamName -> s"'${SchemaVersionOption.LatestOptionName}'"
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestamp", "timestampToSet" -> "0L")
.emptySink("end", "sinkForInputMeta")

val consumerRecord = new InputMetaToJson()
.encoder(BestEffortJsonEncoder.defaultForTests).apply(inputMeta)
.mapObject(_.add("key", Null)
.add("value", obj("city" -> fromString("Lublin"), "street" -> fromString("Lipowa"))))

val results = run(process, TestData(s"""{"keySchemaId":null,"valueSchemaId":$id,"consumerRecord":${consumerRecord.noSpaces} }"""))

val testResultVars = results.nodeResults("end").head.context.variables
testResultVars.get("extractedTimestamp") shouldBe Some(expectedTimestamp)
testResultVars.get("inputMeta") shouldBe Some(inputMeta)
}

private def registerSchema(topic: String) = {
val subject = ConfluentUtils.topicSubject(topic, isKey = false)
val parsedSchema = ConfluentUtils.convertToAvroSchema(Address.schema)
schemaRegistryMockClient.register(subject, parsedSchema)
}

private def run(process: EspProcess, testData: TestData): TestResults[Any] = {

ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
FlinkTestMain.run(LocalModelData(config, creator),
marshall(process), testData, FlinkTestConfiguration.configuration(), identity)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package pl.touk.nussknacker.engine.kafka.generic
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.flink.api.process.FlinkContextInitializer
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, StandardTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.timestamp.BoundedOutOfOrderPreviousElementAssigner
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic, RecordFormatter}
import pl.touk.nussknacker.engine.kafka.source.KafkaSource
import pl.touk.nussknacker.engine.kafka.source.{ConsumerRecordBasedKafkaSource, KafkaSource}
import pl.touk.nussknacker.engine.kafka.source.KafkaSource.defaultMaxOutOfOrdernessMillis

/**
Expand All @@ -27,8 +27,7 @@ trait BaseKafkaDelayedSourceFactory {
formatter: RecordFormatter,
flinkContextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]],
delay: Long): KafkaSource[ConsumerRecord[K, V]] = {
new KafkaSource[ConsumerRecord[K, V]](preparedTopics, kafkaConfig, deserializationSchema, timestampAssigner, formatter) {
override val contextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]] = flinkContextInitializer
new ConsumerRecordBasedKafkaSource[K, V](preparedTopics, kafkaConfig, deserializationSchema, timestampAssigner, formatter, flinkContextInitializer) {
override protected def createFlinkSource(consumerGroupId: String) =
new DelayedFlinkKafkaConsumer(preparedTopics, deserializationSchema, kafkaConfig, consumerGroupId, delay, timestampAssigner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.api.typed._
import pl.touk.nussknacker.engine.flink.api.process.{FlinkContextInitializer, FlinkSource}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializationSchemaFactory
import pl.touk.nussknacker.engine.kafka.consumerrecord.{ConsumerRecordToJsonFormatterFactory, FixedValueDeserializationSchemaFactory}
import pl.touk.nussknacker.engine.kafka.generic.KafkaDelayedSourceFactory._
import pl.touk.nussknacker.engine.kafka.generic.KafkaTypedSourceFactory._
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
Expand All @@ -36,11 +36,13 @@ object sources {

import collection.JavaConverters._

private def jsonFormatterFactory = new ConsumerRecordToJsonFormatterFactory[Json, Json]()

class GenericJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, java.util.Map[_, _]](
new FixedValueDeserializationSchemaFactory(JsonMapDeserialization), None, FixedRecordFormatterFactoryWrapper(JsonRecordFormatter), processObjectDependencies)
new FixedValueDeserializationSchemaFactory(JsonMapDeserialization), None, jsonFormatterFactory, processObjectDependencies)

class GenericTypedJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, TypedMap](
new FixedValueDeserializationSchemaFactory(JsonTypedMapDeserialization), None, FixedRecordFormatterFactoryWrapper(JsonRecordFormatter), processObjectDependencies) {
new FixedValueDeserializationSchemaFactory(JsonTypedMapDeserialization), None, jsonFormatterFactory, processObjectDependencies) {

override protected def prepareInitialParameters: List[Parameter] = super.prepareInitialParameters ++ List(
TypeParameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class KafkaContextInitializer[K, V, DefinedParameter <: BaseDefinedParameter](ke
new BasicContextInitializingFunction[ConsumerRecord[K, V]](processId, taskName) {
override def map(input: ConsumerRecord[K, V]): Context = {
val headers: java.util.Map[String, String] = ConsumerRecordUtils.toMap(input.headers).asJava
val inputMeta = InputMeta(input.key, input.topic, input.partition, input.offset, input.timestamp, input.timestampType(), headers, input.leaderEpoch().orElse(null))
//null won't be serialized properly
val safeLeaderEpoch = input.leaderEpoch().orElse(-1)
val inputMeta = InputMeta(input.key, input.topic, input.partition, input.offset, input.timestamp, input.timestampType(), headers, safeLeaderEpoch)
newContext
.withVariable(VariableConstants.InputVariableName, input.value)
.withVariable(VariableConstants.InputMetaVariableName, inputMeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.process.TestDataGenerator
import pl.touk.nussknacker.engine.api.test.TestDataParser
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkIntermediateRawSource, FlinkSource, FlinkSourceTestSupport}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.timestamp.BoundedOutOfOrderPreviousElementAssigner
import pl.touk.nussknacker.engine.flink.api.process.{FlinkContextInitializer, FlinkCustomNodeContext, FlinkIntermediateRawSource, FlinkSource, FlinkSourceTestSupport}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, StandardTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.timestamp.{BoundedOutOfOrderPreviousElementAssigner, BoundedOutOfOrdernessPunctuatedExtractor}
import pl.touk.nussknacker.engine.kafka._
import pl.touk.nussknacker.engine.kafka.source.KafkaSource.defaultMaxOutOfOrdernessMillis

Expand Down Expand Up @@ -82,6 +82,18 @@ class KafkaSource[T](preparedTopics: List[PreparedKafkaTopic],

}

class ConsumerRecordBasedKafkaSource[K, V](preparedTopics: List[PreparedKafkaTopic],
kafkaConfig: KafkaConfig,
deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]],
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]],
formatter: RecordFormatter,
override val contextInitializer: FlinkContextInitializer[ConsumerRecord[K, V]]) extends KafkaSource[ConsumerRecord[K, V]](preparedTopics, kafkaConfig, deserializationSchema, timestampAssigner, formatter) {

override def timestampAssignerForTest: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]] = timestampAssigner.orElse(Some(
StandardTimestampWatermarkHandler.afterEachEvent[ConsumerRecord[K, V]](_.timestamp())
))
}

object KafkaSource {
val defaultMaxOutOfOrdernessMillis = 60000
}
Loading

0 comments on commit 2deab65

Please sign in to comment.