From c5dde576aa45c5817caef7b57ea8c3fdcf6d3fb9 Mon Sep 17 00:00:00 2001 From: Neelab Chaudhuri Date: Mon, 4 Nov 2024 17:57:52 -0700 Subject: [PATCH] fix java spotless check --- .../datahub/spark/DatahubEventEmitter.java | 6 ++--- .../datahub/spark/DatahubSparkListener.java | 3 +-- .../spark/conf/KafkaDatahubEmitterConfig.java | 3 +-- .../datahub/client/kafka/KafkaEmitter.java | 25 ++++++++----------- 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index 155d14c660c47..0bcc7db9e8740 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -83,9 +83,9 @@ private Optional getEmitter() { try { emitter = Optional.of( - new KafkaEmitter( - datahubKafkaEmitterConfig.getKafkaEmitterConfig(), - datahubKafkaEmitterConfig.getMcpTopic())); + new KafkaEmitter( + datahubKafkaEmitterConfig.getKafkaEmitterConfig(), + datahubKafkaEmitterConfig.getMcpTopic())); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 799ff40b17215..ee0938edb5045 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -191,8 +191,7 @@ public Optional initializeEmitter(Config sparkConf) { if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) { String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC); return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic)); - } - else { + } else { return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); } case "file": diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java index 478a704da02dd..a5f9b59f70846 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java @@ -19,8 +19,7 @@ public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) { this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC; } - public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, - String mcpTopic) { + public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) { this.kafkaEmitterConfig = kafkaEmitterConfig; this.mcpTopic = mcpTopic; } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java index 71bb9713c27fd..777d2d5f301d7 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java @@ -36,31 +36,29 @@ public class KafkaEmitter implements Emitter { /** * The default constructor * - * @param config - * @throws IOException + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails */ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { this(config, DEFAULT_MCP_KAFKA_TOPIC); } /** - * Constructor that takes in KafkaEmitterConfig - * and mcp Kafka Topic Name + * Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name * - * @param config - * @throws IOException + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails */ - public KafkaEmitter(KafkaEmitterConfig config, - String mcpKafkaTopic) throws IOException { + public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException { this.config = config; kafkaConfigProperties = new Properties(); kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap()); kafkaConfigProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.StringSerializer.class); + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.StringSerializer.class); kafkaConfigProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - io.confluent.kafka.serializers.KafkaAvroSerializer.class); + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + io.confluent.kafka.serializers.KafkaAvroSerializer.class); kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl()); kafkaConfigProperties.putAll(config.getSchemaRegistryConfig()); kafkaConfigProperties.putAll(config.getProducerConfig()); @@ -87,8 +85,7 @@ public Future emit(MetadataChangeProposal mcp, Callback d throws IOException { GenericRecord genricRecord = _avroSerializer.serialize(mcp); ProducerRecord record = - new ProducerRecord<>( - this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord); + new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord); org.apache.kafka.clients.producer.Callback callback = new org.apache.kafka.clients.producer.Callback() {