Skip to content

Commit

Permalink
Correct timestamps in test from file for Kafka sources - fixes for sc…
Browse files Browse the repository at this point in the history
…ala 2.11
  • Loading branch information
arkadius committed Sep 1, 2021
1 parent 2deab65 commit af2c7b4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package pl.touk.nussknacker.engine.flink.api.timestampwatermark

import java.time.Duration
import com.github.ghik.silencer.silent
import org.apache.flink.api.common.eventtime
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, TimestampAssigner}
import org.apache.flink.streaming.api.scala.DataStream

import java.time.Duration
import scala.annotation.nowarn

trait TimestampWatermarkHandler[T] extends Serializable {
Expand Down Expand Up @@ -44,11 +43,13 @@ object StandardTimestampWatermarkHandler {
}

def afterEachEvent[T](extract: T => Long): TimestampWatermarkHandler[T] = {
new StandardTimestampWatermarkHandler[T](WatermarkStrategy.forGenerator((_: WatermarkGeneratorSupplier.Context) => new WatermarkGenerator[T] {
override def onEvent(event: T, eventTimestamp: Long, output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(eventTimestamp))
new StandardTimestampWatermarkHandler[T](WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier[T] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T] = new WatermarkGenerator[T] {
override def onEvent(event: T, eventTimestamp: Long, output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(eventTimestamp))
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}).withTimestampAssigner(timestampAssigner(extract)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class TestFromFileSpec extends FunSuite with Matchers with LazyLogging {

val process = EspProcessBuilder.id("test").exceptionHandler()
.source(
"start", "kafka-GenericJsonSourceFactory", TopicParamName -> s"'$topic'",
).customNode("transform", "extractedTimestamp", "extractAndTransformTimestamp", "timestampToSet" -> "0L")
"start", "kafka-GenericJsonSourceFactory", TopicParamName -> s"'$topic'")
.customNode("transform", "extractedTimestamp", "extractAndTransformTimestamp", "timestampToSet" -> "0L")
.emptySink("end", "sinkForInputMeta")

val consumerRecord = new InputMetaToJson()
Expand Down

0 comments on commit af2c7b4

Please sign in to comment.