Skip to content

Commit

Permalink
Added ReflectionRecordDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Apr 14, 2024
1 parent 68591b6 commit 77027d7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ object Records {
fun fromRecord(record: IndexedRecord): Struct {
val schema = Schemas.fromAvro(record.schema) as Schema.Struct
val values = schema.fields.withIndex().map { (index, field) ->
val value = record.get(index)
val decoder = Decoders.decoderFor(field.schema)
decoder.decode(value)
TODO()
// val value = record.get(index)
// val decoder = Decoders.decoderFor(field.schema)
// decoder.decode(value)
}
return Struct(schema, values)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sksamuel.centurion.avro.decoders

import org.apache.avro.Schema
import kotlin.reflect.KType

/**
* A [Decoder] typeclass is used to convert an Avro value, such as a [GenericRecord],
Expand All @@ -19,6 +20,22 @@ fun interface Decoder<T> {
val self = this
return Decoder { schema, value -> fn(self.decode(schema, value)) }
}

companion object {
fun decoderFor(type: KType): Decoder<*> {
val decoder: Decoder<*> = when (type.classifier) {
String::class -> StringDecoder
Boolean::class -> BooleanDecoder
Float::class -> FloatDecoder
Double::class -> DoubleDecoder
Int::class -> IntDecoder
Long::class -> LongDecoder
else -> error("Unsupported type $type")
}
return decoder
// return if (type.isMarkedNullable) NullEncoder(encoder) else encoder
}
}
}

//object Decoders {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.sksamuel.centurion.avro.decoders

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import kotlin.reflect.KClass
import kotlin.reflect.full.primaryConstructor

class ReflectionRecordDecoder<T : Any>(private val kclass: KClass<T>) : Decoder<T> {

companion object {
inline operator fun <reified T : Any> invoke() = ReflectionRecordDecoder(T::class)
}

override fun decode(schema: Schema, value: Any?): T {
require(schema.type == Schema.Type.RECORD)
require(kclass.isData) { "Decoders only support data class: was $kclass" }
require(value is GenericRecord) { "ReflectionRecordDecoder only supports GenericRecords: was $value" }
val args = kclass.primaryConstructor!!.parameters.map { param ->
val field = schema.getField(param.name)
val arg = value.get(param.name)
val decoder = Decoder.decoderFor(param.type)
decoder.decode(field.schema(), arg)
}
require(args.size == kclass.primaryConstructor!!.parameters.size)
return kclass.primaryConstructor!!.call(*args.toTypedArray())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sksamuel.centurion.avro.decoders

import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.avro.util.Utf8

class RecordDecoderTest : FunSpec({

test("basic record decoder") {
data class Foo(val a: String, val b: Boolean)
val schema = SchemaBuilder.record("Foo").fields().requiredString("a").requiredBoolean("b").endRecord()

val record = GenericData.Record(schema)
record.put("a", Utf8("hello"))
record.put("b", true)

ReflectionRecordDecoder<Foo>().decode(schema, record) shouldBe Foo("hello", true)
}

})

0 comments on commit 77027d7

Please sign in to comment.