Skip to content

Commit

Permalink
Merge pull request #66 from sajinieKavindya/master
Browse files Browse the repository at this point in the history
Add missing kafka configuration properties
  • Loading branch information
sajinieKavindya authored Nov 2, 2023
2 parents e881574 + 6396972 commit 16f0ca1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.synapse.inbound.kafka</groupId>
<artifactId>org.apache.synapse.kafka.poll</artifactId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.1</version>
<name>Kafka Polling Consumer</name>
<url>http://wso2.org</url>
<packaging>bundle</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class KafkaConstants {
public static final String RECEIVER_BUFFER_BYTES = "receive.buffer.bytes";
public static final String REQUEST_TIMEOUT_MS = "request.timeout.ms";
public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
public static final String SASL_CLIENT_CALLBACK_HANDLER_CLASS = "sasl.client.callback.handler.class";
public static final String SASL_LOGIN_CLASS = "sasl.login.class";
public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String SASL_MECANISM = "sasl.mechanism";
public static final String SECURITY_PROTOCOL = "security.protocol";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,16 +617,20 @@ private void createKafkaProperties(Properties properties) {
.getProperty(KafkaConstants.MAX_PARTITION_FETCH_BYTES,
KafkaConstants.MAX_PARTITION_FETCH_BYTES_DEFAULT));

kafkaProperties.put(KafkaConstants.KEY_DELEGATE_DESERIALIZER, properties
.getProperty(KafkaConstants.KEY_DELEGATE_DESERIALIZER));
if (properties.getProperty(KafkaConstants.KEY_DELEGATE_DESERIALIZER) != null) {
kafkaProperties.put(KafkaConstants.KEY_DELEGATE_DESERIALIZER, properties
.getProperty(KafkaConstants.KEY_DELEGATE_DESERIALIZER));
}

kafkaProperties.put(KafkaConstants.VALUE_DELEGATE_DESERIALIZER, properties
.getProperty(KafkaConstants.VALUE_DELEGATE_DESERIALIZER));
if (properties.getProperty(KafkaConstants.VALUE_DELEGATE_DESERIALIZER) != null) {
kafkaProperties.put(KafkaConstants.VALUE_DELEGATE_DESERIALIZER, properties
.getProperty(KafkaConstants.VALUE_DELEGATE_DESERIALIZER));
}

if (properties.getProperty(KafkaConstants.VALUE_DESERIALIZER)
.equalsIgnoreCase(KafkaConstants.KAFKA_AVRO_DESERIALIZER)
|| properties.getProperty(KafkaConstants.VALUE_DELEGATE_DESERIALIZER)
.equalsIgnoreCase(KafkaConstants.KAFKA_AVRO_DESERIALIZER)){
|| KafkaConstants.KAFKA_AVRO_DESERIALIZER
.equalsIgnoreCase(properties.getProperty(KafkaConstants.VALUE_DELEGATE_DESERIALIZER))){
kafkaProperties.put(KafkaConstants.SCHEMA_REGISTRY_URL, properties.
getProperty(KafkaConstants.SCHEMA_REGISTRY_URL, KafkaConstants.DEFAULT_SCHEMA_REGISTRY_URL));

Expand Down Expand Up @@ -701,6 +705,15 @@ private void createKafkaProperties(Properties properties) {
.put(KafkaConstants.SASL_JAAS_CONFIG, properties.getProperty(KafkaConstants.SASL_JAAS_CONFIG));
}

if (properties.getProperty(KafkaConstants.SASL_CLIENT_CALLBACK_HANDLER_CLASS) != null) {
kafkaProperties.put(KafkaConstants.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
properties.getProperty(KafkaConstants.SASL_CLIENT_CALLBACK_HANDLER_CLASS));
}

if (properties.getProperty(KafkaConstants.SASL_LOGIN_CLASS) != null) {
kafkaProperties.put(KafkaConstants.SASL_LOGIN_CLASS, properties.getProperty(KafkaConstants.SASL_LOGIN_CLASS));
}

if (properties.getProperty(KafkaConstants.SASL_KERBEROS_SERVICE_NAME) != null) {
kafkaProperties.put(KafkaConstants.SASL_KERBEROS_SERVICE_NAME,
properties.getProperty(KafkaConstants.SASL_KERBEROS_SERVICE_NAME));
Expand Down

0 comments on commit 16f0ca1

Please sign in to comment.