diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index 7357fca2e1..47daed1e24 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -129,7 +129,8 @@ private static void setPlainTextAuthProperties(final Properties properties, fina private static void setSecurityProtocolSSLProperties(final Properties properties, final EncryptionConfig encryptionConfig) { if (Objects.nonNull(encryptionConfig.getCertificateContent())) { setCustomSslProperties(properties, encryptionConfig.getCertificateContent()); - } else { + } else if (Objects.nonNull(encryptionConfig.getTrustStoreFilePath()) && + Objects.nonNull(encryptionConfig.getTrustStorePassword())) { setTruststoreProperties(properties, encryptionConfig); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java index 949f920f66..e2506fafd1 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java @@ -78,6 +78,19 @@ public void testSetAuthPropertiesAuthSslWithTrustStore() throws Exception { assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); } + @Test + public void testSetAuthPropertiesAuthSslWithNoCertContentNoTrustStore() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-sasl-ssl-no-cert-content-no-truststore.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("sasl.mechanism"), is("PLAIN")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + } + private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IOException { final Yaml yaml = new Yaml(); final FileReader fileReader = new FileReader(Objects.requireNonNull(getClass().getClassLoader() diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-no-cert-content-no-truststore.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-no-cert-content-no-truststore.yaml new file mode 100644 index 0000000000..4a16c918a1 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-no-cert-content-no-truststore.yaml @@ -0,0 +1,17 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + authentication: + sasl: + plaintext: + username: username + password: password + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file