Skip to content

Commit

Permalink
Adding serde for config change event key and value (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenakshitiz authored Mar 10, 2022
1 parent 9b32001 commit 0191004
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 5 deletions.
1 change: 1 addition & 0 deletions config-service-change-event-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ sourceSets {

dependencies {
api(libs.protobuf.java)
api(libs.kafka.clients)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.hypertrace.config.change.event.v1;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.serialization.Deserializer;

public class ConfigChangeEventKeyDeserializer implements Deserializer<ConfigChangeEventKey> {
public ConfigChangeEventKey deserialize(String topic, byte[] data) {
try {
return ConfigChangeEventKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.hypertrace.config.change.event.v1;

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class ConfigChangeEventKeySerde implements Serde<ConfigChangeEventKey> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<ConfigChangeEventKey> serializer() {
return new ConfigChangeEventKeySerializer();
}

@Override
public Deserializer<ConfigChangeEventKey> deserializer() {
return new ConfigChangeEventKeyDeserializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.hypertrace.config.change.event.v1;

import org.apache.kafka.common.serialization.Serializer;

public class ConfigChangeEventKeySerializer implements Serializer<ConfigChangeEventKey> {
@Override
public byte[] serialize(String topic, ConfigChangeEventKey data) {
return data.toByteArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.hypertrace.config.change.event.v1;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.serialization.Deserializer;

public class ConfigChangeEventValueDeserializer implements Deserializer<ConfigChangeEventValue> {

@Override
public ConfigChangeEventValue deserialize(String topic, byte[] data) {
try {
return ConfigChangeEventValue.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.hypertrace.config.change.event.v1;

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class ConfigChangeEventValueSerde implements Serde<ConfigChangeEventValue> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<ConfigChangeEventValue> serializer() {
return new ConfigChangeEventValueSerializer();
}

@Override
public Deserializer<ConfigChangeEventValue> deserializer() {
return new ConfigChangeEventValueDeserializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.hypertrace.config.change.event.v1;

import org.apache.kafka.common.serialization.Serializer;

public class ConfigChangeEventValueSerializer implements Serializer<ConfigChangeEventValue> {

@Override
public byte[] serialize(String topic, ConfigChangeEventValue data) {
return data.toByteArray();
}
}
1 change: 0 additions & 1 deletion config-service-change-event-generator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ dependencies {
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

runtimeOnly(libs.kafka.protobuf.serializer)
constraints {
implementation(libs.jersey.common) {
because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ event.store {
config.change.events.producer {
topic.name = config-change-events
bootstrap.servers = "localhost:9092"
key.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
value.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
key.serializer = org.hypertrace.config.change.event.v1.ConfigChangeEventKeySerializer
value.serializer = org.hypertrace.config.change.event.v1.ConfigChangeEventValueSerializer
schema.registry.url = "http://localhost:8081"
}
}
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ hypertrace-grpcutils = "0.6.2"
hypertrace-framework = "0.1.29"
lombok = "1.18.20"
jackson = "2.12.4"
confluent = "6.0.5"
kafka = "6.0.1-ccs"
netty = "4.1.71.Final"
kotlin = "1.4.32"

Expand Down Expand Up @@ -48,7 +48,7 @@ gson = { module = "com.google.code.gson:gson", version.ref = "gson" }
netty-codecHttp2 = { module = "io.netty:netty-codec-http2", version.ref = "netty" }
netty-handlerProxy = { module = "io.netty:netty-handler-proxy", version.ref = "netty" }

kafka-protobuf-serializer = { module = "io.confluent:kafka-protobuf-serializer", version.ref = "confluent" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" }
kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "kotlin" }
kotlin-stdlibJdk7 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk7", version.ref = "kotlin" }
kotlin-stdlibJdk8 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk8", version.ref = "kotlin" }
Expand Down

0 comments on commit 0191004

Please sign in to comment.