From c45ddb15f9fc8803655e4879d3d3494ede937b0d Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 8 Feb 2024 13:54:14 -0800 Subject: [PATCH] When writing Kafka buffer events, save additional information about the encryption in the protobuf record. Contributes toward #3655. (#3976) Signed-off-by: David Venable --- .../plugins/kafka/buffer/KafkaBufferIT.java | 34 +++++--- .../BufferMessageSerializer.java | 29 +++++-- .../BufferSerializationFactory.java | 2 +- .../plugins/kafka/common/KafkaDataConfig.java | 8 ++ .../kafka/common/KafkaDataConfigAdapter.java | 9 ++- .../common/PlaintextKafkaDataConfig.java | 7 +- .../kafka/common/key/InnerKeyProvider.java | 9 +++ .../plugins/kafka/common/key/KeyFactory.java | 28 +++++-- .../kafka/common/key/KmsKeyProvider.java | 23 +++--- .../common/key/UnencryptedKeyProvider.java | 9 ++- .../plugins/kafka/KafkaBufferMessage.proto | 9 +++ .../BufferMessageSerializerTest.java | 80 ++++++++++++++++++- .../common/KafkaDataConfigAdapterTest.java | 16 ++++ .../common/PlaintextKafkaDataConfigTest.java | 32 ++++++-- .../kafka/common/key/KeyFactoryTest.java | 31 +++++++ .../kafka/common/key/KmsKeyProviderTest.java | 5 ++ .../key/UnencryptedKeyProviderTest.java | 5 ++ 17 files changed, 293 insertions(+), 43 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 7989f5dd8d..e250da0eeb 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -15,9 +15,11 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.CheckpointState; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.codec.JsonDecoder; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -25,10 +27,8 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; -import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; - +import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +59,11 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import static org.mockito.Mockito.when; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; @ExtendWith(MockitoExtension.class) public class KafkaBufferIT { @@ -243,6 +242,10 @@ void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutExc final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(consumerRecord.value()); assertThat(bufferData, notNullValue()); + assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + assertThat(bufferData.getEncrypted(), equalTo(false)); + assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty())); + final byte[] innerData = bufferData.getData().toByteArray(); final Map actualEventData = objectMapper.readValue(innerData, Map.class); @@ -279,6 +282,10 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(consumerRecord.value()); assertThat(bufferData, notNullValue()); + assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + assertThat(bufferData.getEncrypted(), equalTo(false)); + assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty())); + final byte[] innerData = bufferData.getData().toByteArray(); assertThat(innerData, equalTo(writtenBytes)); @@ -288,6 +295,7 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep class Encrypted { private Cipher decryptCipher; private Cipher encryptCipher; + private String aesKey; @BeforeEach void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { @@ -300,7 +308,7 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding encryptCipher = Cipher.getInstance("AES"); encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey); final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded()); - final String aesKey = new String(base64Bytes); + aesKey = new String(base64Bytes); final Map topicConfigMap = objectMapper.convertValue(topicConfig, Map.class); topicConfigMap.put("encryption_key", aesKey); @@ -360,6 +368,10 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(valueBytes); assertThat(bufferData, notNullValue()); + assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + assertThat(bufferData.getEncrypted(), equalTo(true)); + assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty())); + byte[] innerData = bufferData.getData().toByteArray(); assertThat(innerData, notNullValue()); @@ -402,6 +414,10 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(valueBytes); assertThat(bufferData, notNullValue()); + assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + assertThat(bufferData.getEncryptedDataKey(), notNullValue()); + assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty())); + final byte[] innerData = bufferData.getData().toByteArray(); assertThat(innerData, notNullValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializer.java index 9760636b1d..c132dce50e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializer.java @@ -8,6 +8,9 @@ import com.google.protobuf.ByteString; import org.apache.kafka.common.serialization.Serializer; import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferMessage; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; + +import java.util.Objects; /** * A Kafka {@link Serializer} which serializes data into a KafkaBuffer Protobuf message. @@ -16,9 +19,11 @@ */ class BufferMessageSerializer implements Serializer { private final Serializer dataSerializer; + private final KafkaDataConfig dataConfig; - public BufferMessageSerializer(final Serializer dataSerializer) { - this.dataSerializer = dataSerializer; + public BufferMessageSerializer(final Serializer dataSerializer, final KafkaDataConfig dataConfig) { + this.dataSerializer = Objects.requireNonNull(dataSerializer); + this.dataConfig = Objects.requireNonNull(dataConfig); } @Override @@ -28,10 +33,7 @@ public byte[] serialize(final String topic, final T data) { final byte[] serializedData = dataSerializer.serialize(topic, data); - final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder() - .setData(ByteString.copyFrom(serializedData)) - .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) - .build(); + final KafkaBufferMessage.BufferData bufferedData = buildProtobufMessage(serializedData); return bufferedData.toByteArray(); } @@ -39,4 +41,19 @@ public byte[] serialize(final String topic, final T data) { Serializer getDataSerializer() { return dataSerializer; } + + private KafkaBufferMessage.BufferData buildProtobufMessage(final byte[] serializedData) { + KafkaBufferMessage.BufferData.Builder messageBuilder = KafkaBufferMessage.BufferData.newBuilder() + .setData(ByteString.copyFrom(serializedData)) + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES); + + if(dataConfig.getEncryptionKeySupplier() != null) { + messageBuilder = messageBuilder.setEncrypted(true); + + if(dataConfig.getEncryptedDataKey() != null) + messageBuilder = messageBuilder.setEncryptedDataKey(ByteString.copyFromUtf8(dataConfig.getEncryptedDataKey())); + } + + return messageBuilder.build(); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferSerializationFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferSerializationFactory.java index d0926f466d..d736c62ae8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferSerializationFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferSerializationFactory.java @@ -34,6 +34,6 @@ public Deserializer getDeserializer(final KafkaDataConfig dataConfig) { @Override public Serializer getSerializer(final KafkaDataConfig dataConfig) { - return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig)); + return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig), dataConfig); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfig.java index 7dd5dd2feb..43bba9ca76 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfig.java @@ -11,4 +11,12 @@ public interface KafkaDataConfig { MessageFormat getSerdeFormat(); Supplier getEncryptionKeySupplier(); + + /** + * Returns an encrypted data key. If the encryption key is not encrypted, + * then this will return null. + * + * @return The encrypted data key provided in the configuration. + */ + String getEncryptedDataKey(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java index c4be81fe26..46c5263427 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapter.java @@ -19,7 +19,7 @@ public class KafkaDataConfigAdapter implements KafkaDataConfig { private final KeyFactory keyFactory; private final TopicConfig topicConfig; - public KafkaDataConfigAdapter(KeyFactory keyFactory, TopicConfig topicConfig) { + public KafkaDataConfigAdapter(final KeyFactory keyFactory, final TopicConfig topicConfig) { this.keyFactory = keyFactory; this.topicConfig = topicConfig; } @@ -35,4 +35,11 @@ public Supplier getEncryptionKeySupplier() { return null; return keyFactory.getKeySupplier(topicConfig); } + + @Override + public String getEncryptedDataKey() { + if(topicConfig.getEncryptionKey() == null) + return null; + return keyFactory.getEncryptedDataKey(topicConfig); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfig.java index 26a6239823..f86e1cbf2b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfig.java @@ -7,7 +7,7 @@ public class PlaintextKafkaDataConfig implements KafkaDataConfig { private final KafkaDataConfig dataConfig; - private PlaintextKafkaDataConfig(KafkaDataConfig dataConfig) { + private PlaintextKafkaDataConfig(final KafkaDataConfig dataConfig) { this.dataConfig = dataConfig; } @@ -31,4 +31,9 @@ public MessageFormat getSerdeFormat() { public Supplier getEncryptionKeySupplier() { return dataConfig.getEncryptionKeySupplier(); } + + @Override + public String getEncryptedDataKey() { + return dataConfig.getEncryptedDataKey(); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/InnerKeyProvider.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/InnerKeyProvider.java index 88fd91ec83..b85fc02d31 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/InnerKeyProvider.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/InnerKeyProvider.java @@ -6,4 +6,13 @@ interface InnerKeyProvider extends Function { boolean supportsConfiguration(TopicConfig topicConfig); + + /** + * Returns true if this {@link InnerKeyProvider} expects an encrypted + * data key. Returns false if the encryption key is in plain-text + * in the pipeline configuration. + * + * @return True if the key is encrypted. + */ + boolean isKeyEncrypted(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactory.java index 96cfa82c76..2f071721b9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactory.java @@ -9,27 +9,43 @@ public class KeyFactory { private final List orderedKeyProviders; - public KeyFactory(AwsContext awsContext) { + public KeyFactory(final AwsContext awsContext) { this(List.of( new KmsKeyProvider(awsContext), new UnencryptedKeyProvider() )); } - KeyFactory(List orderedKeyProviders) { + KeyFactory(final List orderedKeyProviders) { this.orderedKeyProviders = orderedKeyProviders; } - public Supplier getKeySupplier(TopicConfig topicConfig) { + public Supplier getKeySupplier(final TopicConfig topicConfig) { if (topicConfig.getEncryptionKey() == null) return null; - InnerKeyProvider keyProvider = orderedKeyProviders + final InnerKeyProvider keyProvider = getInnerKeyProvider(topicConfig); + + return () -> keyProvider.apply(topicConfig); + } + + public String getEncryptedDataKey(final TopicConfig topicConfig) { + if (topicConfig.getEncryptionKey() == null) + return null; + + final InnerKeyProvider keyProvider = getInnerKeyProvider(topicConfig); + + if(keyProvider.isKeyEncrypted()) + return topicConfig.getEncryptionKey(); + + return null; + } + + private InnerKeyProvider getInnerKeyProvider(final TopicConfig topicConfig) { + return orderedKeyProviders .stream() .filter(innerKeyProvider -> innerKeyProvider.supportsConfiguration(topicConfig)) .findFirst() .orElseThrow(() -> new RuntimeException("Unable to find an inner key provider. This is a programming error - UnencryptedKeyProvider should always work.")); - - return () -> keyProvider.apply(topicConfig); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProvider.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProvider.java index 6ddab0914a..a1dbc71b3e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProvider.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProvider.java @@ -13,29 +13,34 @@ class KmsKeyProvider implements InnerKeyProvider { private final AwsContext awsContext; - public KmsKeyProvider(AwsContext awsContext) { + public KmsKeyProvider(final AwsContext awsContext) { this.awsContext = awsContext; } @Override - public boolean supportsConfiguration(TopicConfig topicConfig) { + public boolean supportsConfiguration(final TopicConfig topicConfig) { return topicConfig.getKmsConfig() != null && topicConfig.getKmsConfig().getKeyId() != null; } @Override - public byte[] apply(TopicConfig topicConfig) { - KmsConfig kmsConfig = topicConfig.getKmsConfig(); - String kmsKeyId = kmsConfig.getKeyId(); + public boolean isKeyEncrypted() { + return true; + } + + @Override + public byte[] apply(final TopicConfig topicConfig) { + final KmsConfig kmsConfig = topicConfig.getKmsConfig(); + final String kmsKeyId = kmsConfig.getKeyId(); - AwsCredentialsProvider awsCredentialsProvider = awsContext.getOrDefault(kmsConfig); + final AwsCredentialsProvider awsCredentialsProvider = awsContext.getOrDefault(kmsConfig); - KmsClient kmsClient = KmsClient.builder() + final KmsClient kmsClient = KmsClient.builder() .credentialsProvider(awsCredentialsProvider) .region(awsContext.getRegionOrDefault(kmsConfig)) .build(); - byte[] decodedEncryptionKey = Base64.getDecoder().decode(topicConfig.getEncryptionKey()); - DecryptResponse decryptResponse = kmsClient.decrypt(builder -> builder + final byte[] decodedEncryptionKey = Base64.getDecoder().decode(topicConfig.getEncryptionKey()); + final DecryptResponse decryptResponse = kmsClient.decrypt(builder -> builder .keyId(kmsKeyId) .ciphertextBlob(SdkBytes.fromByteArray(decodedEncryptionKey)) .encryptionContext(kmsConfig.getEncryptionContext()) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProvider.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProvider.java index 19309697fd..452bade186 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProvider.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProvider.java @@ -6,12 +6,17 @@ class UnencryptedKeyProvider implements InnerKeyProvider { @Override - public boolean supportsConfiguration(TopicConfig topicConfig) { + public boolean supportsConfiguration(final TopicConfig topicConfig) { return true; } @Override - public byte[] apply(TopicConfig topicConfig) { + public boolean isKeyEncrypted() { + return false; + } + + @Override + public byte[] apply(final TopicConfig topicConfig) { return Base64.getDecoder().decode(topicConfig.getEncryptionKey()); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/proto/org/opensearch/dataprepper/plugins/kafka/KafkaBufferMessage.proto b/data-prepper-plugins/kafka-plugins/src/main/proto/org/opensearch/dataprepper/plugins/kafka/KafkaBufferMessage.proto index 788ae7897d..ed840340f2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/proto/org/opensearch/dataprepper/plugins/kafka/KafkaBufferMessage.proto +++ b/data-prepper-plugins/kafka-plugins/src/main/proto/org/opensearch/dataprepper/plugins/kafka/KafkaBufferMessage.proto @@ -16,4 +16,13 @@ message BufferData { * is unencrypted data. */ bytes data = 2; + + /* Indicates if data is encrypted or not. + */ + optional bool encrypted = 3; + + /* The data key which encrypted the data field. This will be encrypted. + * The consuming Data Prepper node must have the ability to decrypt this key. + */ + optional bytes encrypted_data_key = 4; } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializerTest.java index eb196a085b..2d9d3c99d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferMessageSerializerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.kafka.buffer.serialization; +import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.kafka.common.serialization.Serializer; import org.junit.jupiter.api.BeforeEach; @@ -13,14 +14,20 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferMessage; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; import java.util.Random; import java.util.UUID; +import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -29,6 +36,9 @@ class BufferMessageSerializerTest { @Mock private Serializer innerDataSerializer; + @Mock + private KafkaDataConfig dataConfig; + private String topic; private Object inputData; private Random random; @@ -42,7 +52,19 @@ void setUp() { } private BufferMessageSerializer createObjectUnderTest() { - return new BufferMessageSerializer<>(innerDataSerializer); + return new BufferMessageSerializer<>(innerDataSerializer, dataConfig); + } + + @Test + void constructor_throws_if_innerDataSerializer_is_null() { + innerDataSerializer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_dataConfig_is_null() { + innerDataSerializer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); } @Test @@ -69,6 +91,62 @@ void serialize_returns_bytes_wrapped_in_KafkaBufferMessage() throws InvalidProto assertThat(actualBufferedData.getData(), notNullValue()); assertThat(actualBufferedData.getData().toByteArray(), equalTo(expectedBytes)); + + assertThat(actualBufferedData.getEncrypted(), equalTo(false)); + assertThat(actualBufferedData.getEncryptedDataKey(), equalTo(ByteString.empty())); + + verify(dataConfig, never()).getEncryptedDataKey(); + } + + @Test + void serialize_returns_bytes_wrapped_in_KafkaBufferMessage_with_encryption_data_when_encrypted() throws InvalidProtocolBufferException { + final byte[] expectedBytes = new byte[32]; + random.nextBytes(expectedBytes); + final String encryptionKey = UUID.randomUUID().toString(); + when(innerDataSerializer.serialize(topic, inputData)).thenReturn(expectedBytes); + when(dataConfig.getEncryptionKeySupplier()).thenReturn(mock(Supplier.class)); + when(dataConfig.getEncryptedDataKey()).thenReturn(encryptionKey); + final byte[] actualBytes = createObjectUnderTest().serialize(topic, inputData); + + assertThat(actualBytes, notNullValue()); + + final KafkaBufferMessage.BufferData actualBufferedData = + KafkaBufferMessage.BufferData.parseFrom(actualBytes); + + assertThat(actualBufferedData.getMessageFormat(), + equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + + assertThat(actualBufferedData.getData(), notNullValue()); + assertThat(actualBufferedData.getData().toByteArray(), + equalTo(expectedBytes)); + + assertThat(actualBufferedData.getEncrypted(), equalTo(true)); + assertThat(actualBufferedData.getEncryptedDataKey(), equalTo(ByteString.copyFromUtf8(encryptionKey))); + } + + @Test + void serialize_returns_bytes_wrapped_in_KafkaBufferMessage_without_encryption_data_when_encrypted_but_data_key_is_plaintext() throws InvalidProtocolBufferException { + final byte[] expectedBytes = new byte[32]; + random.nextBytes(expectedBytes); + final String encryptionKey = UUID.randomUUID().toString(); + when(innerDataSerializer.serialize(topic, inputData)).thenReturn(expectedBytes); + when(dataConfig.getEncryptionKeySupplier()).thenReturn(mock(Supplier.class)); + final byte[] actualBytes = createObjectUnderTest().serialize(topic, inputData); + + assertThat(actualBytes, notNullValue()); + + final KafkaBufferMessage.BufferData actualBufferedData = + KafkaBufferMessage.BufferData.parseFrom(actualBytes); + + assertThat(actualBufferedData.getMessageFormat(), + equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)); + + assertThat(actualBufferedData.getData(), notNullValue()); + assertThat(actualBufferedData.getData().toByteArray(), + equalTo(expectedBytes)); + + assertThat(actualBufferedData.getEncrypted(), equalTo(true)); + assertThat(actualBufferedData.getEncryptedDataKey(), equalTo(ByteString.empty())); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapterTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapterTest.java index 7e2aa9ef49..f09d725a3e 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapterTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaDataConfigAdapterTest.java @@ -54,4 +54,20 @@ void getEncryptionKeySupplier_returns_keyFactory_getKeySupplier_if_encryptionKey assertThat(createObjectUnderTest().getEncryptionKeySupplier(), equalTo(keySupplier)); } + + @Test + void getEncryptionKey_returns_null_when_topic_encryptedDataKey_is_null() { + assertThat(createObjectUnderTest().getEncryptedDataKey(), + nullValue()); + } + + @Test + void getEncryptionKey_returns_value_of_getEncryptedDataKey_when_encryptedDataKey_is_not_null() { + final String encryptionKey = UUID.randomUUID().toString(); + when(topicConfig.getEncryptionKey()).thenReturn(UUID.randomUUID().toString()); + when(keyFactory.getEncryptedDataKey(topicConfig)).thenReturn(encryptionKey); + + assertThat(createObjectUnderTest().getEncryptedDataKey(), + equalTo(encryptionKey)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfigTest.java index ef8e3bd4c6..ffdf2a55f9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/PlaintextKafkaDataConfigTest.java @@ -1,41 +1,59 @@ package org.opensearch.dataprepper.plugins.kafka.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.util.UUID; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class PlaintextKafkaDataConfigTest { + @Mock + private KafkaDataConfig inputDataConfig; + @ParameterizedTest @EnumSource(MessageFormat.class) void plaintextDataConfig_returns_KafkaDataConfig_with_getSerdeFormat_returning_PLAINTEXT(MessageFormat inputFormat) { - KafkaDataConfig inputDataConfig = mock(KafkaDataConfig.class); - when(inputDataConfig.getSerdeFormat()).thenReturn(inputFormat); - - KafkaDataConfig outputDataConfig = PlaintextKafkaDataConfig.plaintextDataConfig(inputDataConfig); + final KafkaDataConfig outputDataConfig = PlaintextKafkaDataConfig.plaintextDataConfig(inputDataConfig); assertThat(outputDataConfig, notNullValue()); assertThat(outputDataConfig.getSerdeFormat(), equalTo(MessageFormat.PLAINTEXT)); + verify(inputDataConfig, never()).getSerdeFormat(); } @Test void plaintextDataConfig_returns_KafkaDataConfig_with_getEncryptionKeySupplier_returning_from_inner_dataConfig() { - KafkaDataConfig inputDataConfig = mock(KafkaDataConfig.class); - Supplier keySupplier = mock(Supplier.class); + final Supplier keySupplier = mock(Supplier.class); when(inputDataConfig.getEncryptionKeySupplier()).thenReturn(keySupplier); - KafkaDataConfig outputDataConfig = PlaintextKafkaDataConfig.plaintextDataConfig(inputDataConfig); + final KafkaDataConfig outputDataConfig = PlaintextKafkaDataConfig.plaintextDataConfig(inputDataConfig); assertThat(outputDataConfig, notNullValue()); assertThat(outputDataConfig.getEncryptionKeySupplier(), equalTo(keySupplier)); } + + @Test + void plaintextDataConfig_returns_KafkaDataConfig_with_getEncryptionKey_returning_from_inner_dataConfig() { + final String encryptionKey = UUID.randomUUID().toString(); + when(inputDataConfig.getEncryptedDataKey()).thenReturn(encryptionKey); + + final KafkaDataConfig outputDataConfig = PlaintextKafkaDataConfig.plaintextDataConfig(inputDataConfig); + + assertThat(outputDataConfig, notNullValue()); + assertThat(outputDataConfig.getEncryptedDataKey(), equalTo(encryptionKey)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactoryTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactoryTest.java index d4d2f506b5..a4e615bab1 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactoryTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KeyFactoryTest.java @@ -62,4 +62,35 @@ void getKeySupplier_returns_using_first_InnerKeyFactory_that_supports_the_TopicC InnerKeyProvider lastKeyProvider = innerKeyProviders.get(2); verifyNoInteractions(lastKeyProvider); } + + @Test + void getEncryptedDataKey_returns_null_if_encryptionKey_is_null() { + assertThat(createObjectUnderTest().getEncryptedDataKey(topicConfig), + nullValue()); + } + + @Test + void getEncryptedDataKey_returns_null_if_encryptionKey_is_present_and_innerKeyProvider_indicates_unencrypted_data_key() { + when(topicConfig.getEncryptionKey()).thenReturn(UUID.randomUUID().toString()); + + final InnerKeyProvider middleKeyProvider = innerKeyProviders.get(1); + when(middleKeyProvider.supportsConfiguration(topicConfig)).thenReturn(true); + when(middleKeyProvider.isKeyEncrypted()).thenReturn(false); + + assertThat(createObjectUnderTest().getEncryptedDataKey(topicConfig), + nullValue()); + } + + @Test + void getEncryptedDataKey_returns_null_if_encryptionKey_is_present_and_innerKeyProvider_indicates_encrypted_data_key() { + final String encryptionKey = UUID.randomUUID().toString(); + when(topicConfig.getEncryptionKey()).thenReturn(encryptionKey); + + final InnerKeyProvider middleKeyProvider = innerKeyProviders.get(1); + when(middleKeyProvider.supportsConfiguration(topicConfig)).thenReturn(true); + when(middleKeyProvider.isKeyEncrypted()).thenReturn(true); + + assertThat(createObjectUnderTest().getEncryptedDataKey(topicConfig), + equalTo(encryptionKey)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProviderTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProviderTest.java index 0e44d2aca8..35e6a5532d 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProviderTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/KmsKeyProviderTest.java @@ -185,4 +185,9 @@ void apply_calls_decrypt_with_correct_values_when_encryption_context_is_present( verify(builder).encryptionContext(encryptionContext); } } + + @Test + void isKeyEncrypted_returns_true() { + assertThat(createObjectUnderTest().isKeyEncrypted(), equalTo(true)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProviderTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProviderTest.java index 4ad3f7b779..e0dcd765fb 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProviderTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/key/UnencryptedKeyProviderTest.java @@ -39,4 +39,9 @@ void apply_returns_base64_decoded_encryptionKey() { assertThat(actualBytes, notNullValue()); assertThat(actualBytes, equalTo(unencodedInput.getBytes())); } + + @Test + void isKeyEncrypted_returns_true() { + assertThat(createObjectUnderTest().isKeyEncrypted(), equalTo(false)); + } } \ No newline at end of file