Skip to content

Commit

Permalink
fix java spotless check
Browse files Browse the repository at this point in the history
  • Loading branch information
Neelab Chaudhuri committed Nov 5, 2024
1 parent 27b72af commit c5dde57
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ private Optional<Emitter> getEmitter() {
try {
emitter =
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
}
else {
} else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig,
String mcpTopic) {
public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,29 @@ public class KafkaEmitter implements Emitter {
/**
* The default constructor
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}

/**
* Constructor that takes in KafkaEmitterConfig
* and mcp Kafka Topic Name
* Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config,
String mcpKafkaTopic) throws IOException {
public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
kafkaConfigProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
kafkaConfigProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl());
kafkaConfigProperties.putAll(config.getSchemaRegistryConfig());
kafkaConfigProperties.putAll(config.getProducerConfig());
Expand All @@ -87,8 +85,7 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback d
throws IOException {
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {

Expand Down

0 comments on commit c5dde57

Please sign in to comment.