Skip to content

Commit

Permalink
Fix KeyedRecordFlatMapper performance:
Browse files Browse the repository at this point in the history
1. Do not use reflection, get typing from AvroSinkRecordValue
  • Loading branch information
nadberezny authored and arkadius committed Sep 14, 2021
1 parent 5b4bdc9 commit 41ff2a7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.avro.sink

import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.typed.typing.{TypedObjectTypingResult, TypingResult}

object AvroSinkValue {
case class InvalidSinkValue(parameterName: String)
Expand All @@ -10,12 +11,12 @@ object AvroSinkValue {
sinkParameter match {
case AvroSinkSingleValueParameter(param) =>
val value = parameterValues(param.name)
AvroSinkSingleValue(toLazyParameter(value, param.name))
AvroSinkSingleValue(toLazyParameter(value, param.name), param.typ)

case AvroSinkRecordParameter(paramFields) =>
val fields = paramFields.map { case (fieldName, sinkParam) =>
(fieldName, applyUnsafe(sinkParam, parameterValues))
}.toMap
}
AvroSinkRecordValue(fields)
}

Expand All @@ -30,10 +31,19 @@ object AvroSinkValue {
/*
Intermediate object which helps with mapping Avro sink editor structure to Avro message (see AvroSinkValueParameter)
*/
sealed trait AvroSinkValue
sealed trait AvroSinkValue {

case class AvroSinkSingleValue(value: LazyParameter[AnyRef])
def typingResult: TypingResult
}

case class AvroSinkSingleValue(value: LazyParameter[AnyRef], typingResult: TypingResult)
extends AvroSinkValue

case class AvroSinkRecordValue(fields: Map[String, AvroSinkValue])
extends AvroSinkValue
case class AvroSinkRecordValue(fields: List[(String, AvroSinkValue)])
extends AvroSinkValue {

val typingResult: TypedObjectTypingResult = {
val fieldsTyping = fields.map { case (fieldName, value) => (fieldName, value.typingResult)}
TypedObjectTypingResult(fieldsTyping)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ abstract class BaseKafkaAvroSinkFactory extends SinkFactory {
nodeId: NodeId): FlinkSink = {
//This is a bit redundant, since we already validate during creation
val schemaData = schemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException)
validateValueType(value.returnType, schemaData.schema, validationMode).valueOr(err => throw new CustomNodeValidationException(err.message, err.paramName, null))
val returnType = value.returnType
validateValueType(returnType, schemaData.schema, validationMode).valueOr(err => throw new CustomNodeValidationException(err.message, err.paramName, null))
val schemaUsedInRuntime = schemaDeterminer.toRuntimeSchema(schemaData)

val clientId = s"${processMetaData.id}-${preparedTopic.prepared}"
new KafkaAvroSink(preparedTopic, version, key, AvroSinkSingleValue(value), kafkaConfig, serializationSchemaFactory,
new KafkaAvroSink(preparedTopic, version, key, AvroSinkSingleValue(value, returnType), kafkaConfig, serializationSchemaFactory,
schemaData.serializableSchema, schemaUsedInRuntime.map(_.serializableSchema), clientId, validationMode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class KafkaAvroSink(preparedTopic: PreparedKafkaTopic,

private def toValueWithContext(ds: DataStream[Context], flinkNodeContext: FlinkCustomNodeContext): DataStream[ValueWithContext[KeyedValue[AnyRef, AnyRef]]] =
sinkValue match {
case AvroSinkSingleValue(value) =>
case AvroSinkSingleValue(value, _) =>
ds.map(new KeyedValueMapper(flinkNodeContext.lazyParameterHelper, key, value))
case sinkRecord: AvroSinkRecordValue =>
ds.flatMap(KeyedRecordFlatMapper(flinkNodeContext, key, sinkRecord))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import org.apache.flink.api.common.functions.{RichFlatMapFunction, RuntimeContex
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, LazyParameterInterpreter, ValueWithContext}
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionHandler
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkLazyParameterFunctionHelper}
import pl.touk.nussknacker.engine.flink.util.keyed
import pl.touk.nussknacker.engine.flink.util.keyed.KeyedValue
import KeyedRecordFlatMapper._


private[sink] object KeyedRecordFlatMapper {

type Key = AnyRef

type RecordMap = Map[String, AnyRef]

def apply(flinkCustomNodeContext: FlinkCustomNodeContext, key: LazyParameter[AnyRef], sinkRecord: AvroSinkRecordValue): KeyedRecordFlatMapper =
new KeyedRecordFlatMapper(
flinkCustomNodeContext.nodeId,
Expand All @@ -28,14 +33,19 @@ private[sink] class KeyedRecordFlatMapper(nodeId: String,
sinkRecord: AvroSinkRecordValue)
extends RichFlatMapFunction[Context, ValueWithContext[KeyedValue[AnyRef, AnyRef]]] {

private val outputType = sinkRecord.typingResult

private var exceptionHandler: FlinkEspExceptionHandler = _

private implicit var lazyParameterInterpreter: LazyParameterInterpreter = _

private var interpreter: Context => KeyedValue[Key, RecordMap] = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)
exceptionHandler = exceptionHandlerPreparer(getRuntimeContext)
lazyParameterInterpreter = lazyParameterHelper.createInterpreter(getRuntimeContext)
interpreter = createInterpreter()
}

override def close(): Unit = {
Expand All @@ -44,31 +54,32 @@ private[sink] class KeyedRecordFlatMapper(nodeId: String,
Option(lazyParameterInterpreter).foreach(_.close())
}

private lazy val emptyRecord: LazyParameter[Map[String, AnyRef]] = lazyParameterInterpreter
.pure[Map[String, AnyRef]](Map.empty, typing.Typed[Map[String, AnyRef]])
private lazy val emptyRecord: LazyParameter[RecordMap] = lazyParameterInterpreter
.pure[RecordMap](Map.empty, outputType)

private lazy val record: LazyParameter[Map[String, AnyRef]] =
merge(emptyRecord, sinkRecord)
override def flatMap(value: Context, out: Collector[ValueWithContext[KeyedValue[Key, AnyRef]]]): Unit =
exceptionHandler.handling(Some(nodeId), value) {
out.collect(ValueWithContext(interpreter(value), value))
}

// TODO: May affect performance: tests needed
private def merge(agg: LazyParameter[Map[String, AnyRef]], sinkRecord: AvroSinkRecordValue): LazyParameter[Map[String, AnyRef]] =
private def createInterpreter(): Context => KeyedValue[Key, RecordMap] = {
val record = merge(emptyRecord, sinkRecord)
val keyedRecord = key.product(record).map(
fun = tuple => KeyedValue(tuple._1, tuple._2),
outputTypingResult = outputType
)
lazyParameterInterpreter.syncInterpretationFunction(keyedRecord)
}

private def merge(agg: LazyParameter[RecordMap], sinkRecord: AvroSinkRecordValue): LazyParameter[RecordMap] =
sinkRecord.fields.foldLeft(agg) { case (lazyRecord, (fieldName, fieldSinkValue)) =>
val lazyParam = fieldSinkValue match {
case single: AvroSinkSingleValue => single.value
case sinkRec: AvroSinkRecordValue => merge(emptyRecord, sinkRec)
}
lazyRecord.product(lazyParam).map { case (rec, value) =>
rec + (fieldName -> value)
}
lazyRecord.product(lazyParam).map (
fun = { case (rec, value) => rec + (fieldName -> value)},
outputTypingResult = outputType
)
}

override def flatMap(value: Context, out: Collector[ValueWithContext[KeyedValue[AnyRef, AnyRef]]]): Unit =
exceptionHandler.handling(Some(nodeId), value) {
out.collect(ValueWithContext(interpret(value), value))
}

private def interpret(ctx: Context): keyed.KeyedValue[AnyRef, AnyRef] =
lazyParameterInterpreter.syncInterpretationFunction(
key.product(record).map(tuple => KeyedValue(tuple._1, tuple._2))
)(ctx)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.SinkValueParamNa


class AvroSinkValueTest extends FunSuite with Matchers {
private implicit val nodeId = NodeId("")
private implicit val nodeId: NodeId = NodeId("")

test("sink params to AvroSinkRecordValue") {
val recordSchema = SchemaBuilder
Expand All @@ -34,12 +34,14 @@ class AvroSinkValueTest extends FunSuite with Matchers {

val sinkParam = AvroSinkValueParameter(recordSchema).valueOr(e => fail(e.toString))

val fields = AvroSinkValue.applyUnsafe(sinkParam, parameterValues)
val fields: Map[String, AvroSinkValue] = AvroSinkValue.applyUnsafe(sinkParam, parameterValues)
.asInstanceOf[AvroSinkRecordValue]
.fields
.fields.toMap

fields("a").asInstanceOf[AvroSinkSingleValue].value shouldBe value
fields("b").asInstanceOf[AvroSinkRecordValue].fields("c").asInstanceOf[AvroSinkSingleValue].value shouldBe value

val b: Map[String, AvroSinkValue] = fields("b").asInstanceOf[AvroSinkRecordValue].fields.toMap
b("c").asInstanceOf[AvroSinkSingleValue].value shouldBe value
}

test("sink params to AvroSinkSingleValue") {
Expand Down

0 comments on commit 41ff2a7

Please sign in to comment.