Skip to content

Commit

Permalink
Added RecordEncoder
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Apr 14, 2024
1 parent fc64292 commit 597ff34
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.sksamuel.centurion.avro.encoders

import org.apache.avro.Schema
import java.math.BigDecimal
import kotlin.reflect.KType

/**
* An [Encoder] typeclass encodes a JVM value of type T into a value suitable
Expand All @@ -10,8 +12,8 @@ import org.apache.avro.Schema
* or it could encode it as an instance of [GenericFixed].
*
* Some encoders use the schema to determine the encoding function to return. For example, strings
* can be encoded as [UTF8]s, [GenericFixed]]s, [ByteBuffers] or [java.lang.String]s.
* Therefore, the []Encoder<String>] typeclass instances uses the schema to select which of these
* can be encoded as [UTF8]s, [GenericFixed]s, [ByteBuffers] or [java.lang.String]s.
* Therefore, the [Encoder<String>] typeclass instances uses the schema to select which of these
* implementations to use.
*
* Other types may not require the schema at all. For example, the default [Encoder<Int>] always
Expand All @@ -20,6 +22,7 @@ import org.apache.avro.Schema
fun interface Encoder<T> {

companion object {

/**
* Returns an [Encoder] that encodes using the supplied function.
*/
Expand All @@ -29,6 +32,20 @@ fun interface Encoder<T> {
* Returns an [Encoder] that encodes by simply returning the input value.
*/
fun <T : Any> identity(): Encoder<T> = Encoder { _, value -> value }

fun encoderFor(type: KType): Encoder<*> {
val encoder: Encoder<*> = when (type.classifier) {
String::class -> StringEncoder
Boolean::class -> BooleanEncoder
Float::class -> FloatEncoder
Double::class -> DoubleEncoder
Int::class -> IntEncoder
Long::class -> LongEncoder
BigDecimal::class -> BigDecimalStringEncoder
else -> error("Unsupported type $type")
}
return if (type.isMarkedNullable) NullEncoder(encoder) else encoder
}
}

fun encode(schema: Schema, value: T): Any?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.math.BigDecimal
*/
object BigDecimalBytesEncoder : Encoder<BigDecimal> {

override fun encode(schema: Schema, value: BigDecimal): Any {
override fun encode(schema: Schema, value: BigDecimal): Any? {
require(schema.type == Schema.Type.BYTES)

val logical = schema.logicalType as LogicalTypes.Decimal
Expand All @@ -26,7 +26,7 @@ object BigDecimalBytesEncoder : Encoder<BigDecimal> {
*/
object BigDecimalStringEncoder : Encoder<BigDecimal> {

override fun encode(schema: Schema, value: BigDecimal): Any {
override fun encode(schema: Schema, value: BigDecimal): Any? {
require(schema.type == Schema.Type.STRING)
val encoder = StringEncoder.contraMap<BigDecimal> { it.toString() }
return encoder.encode(schema, value)
Expand All @@ -38,7 +38,7 @@ object BigDecimalStringEncoder : Encoder<BigDecimal> {
*/
object BigDecimalFixedEncoder : Encoder<BigDecimal> {

override fun encode(schema: Schema, value: BigDecimal): Any {
override fun encode(schema: Schema, value: BigDecimal): Any? {
require(schema.type == Schema.Type.FIXED)

val logical = schema.logicalType as LogicalTypes.Decimal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.sksamuel.centurion.avro.encoders
import org.apache.avro.Schema

/**
* An [Encoder] that supports nullable types by wrapping in an Avro union.
* An [Encoder] that supports nullable types
*/
class NullEncoder<T>(private val encoder: Encoder<T>) : Encoder<T?> {
override fun encode(schema: Schema, value: T?): Any? {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.sksamuel.centurion.avro.encoders

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import kotlin.reflect.KProperty1
import kotlin.reflect.full.declaredMemberProperties

class RecordEncoder : Encoder<Any> {

override fun encode(schema: Schema, value: Any): Any {
require(schema.type == Schema.Type.RECORD)

val kclass = value::class
require(kclass.isData) { "Can only encode data classes: $kclass" }

val record = GenericData.Record(schema)

value::class.declaredMemberProperties.map { member: KProperty1<out Any, *> ->
val field = schema.getField(member.name)
val encoder = Encoder.encoderFor(member.returnType) as Encoder<Any?>
val v = member.getter.call(value)
val encoded = encoder.encode(field.schema(), v)
record.put(member.name, encoded)
}

return record
}
}

0 comments on commit 597ff34

Please sign in to comment.