Skip to content

Commit

Permalink
When writing Kafka buffer events, save additional information about t…
Browse files Browse the repository at this point in the history
…he encryption in the protobuf record. Contributes toward opensearch-project#3655.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Jan 18, 2024
1 parent 41eab73 commit f9f8d1d
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,12 +59,11 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

@ExtendWith(MockitoExtension.class)
public class KafkaBufferIT {
Expand Down Expand Up @@ -243,6 +242,10 @@ void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutExc
final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(consumerRecord.value());

assertThat(bufferData, notNullValue());
assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES));
assertThat(bufferData.getEncrypted(), equalTo(false));
assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty()));

final byte[] innerData = bufferData.getData().toByteArray();

final Map<String, Object> actualEventData = objectMapper.readValue(innerData, Map.class);
Expand Down Expand Up @@ -279,6 +282,10 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep
final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(consumerRecord.value());

assertThat(bufferData, notNullValue());
assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES));
assertThat(bufferData.getEncrypted(), equalTo(false));
assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty()));

final byte[] innerData = bufferData.getData().toByteArray();

assertThat(innerData, equalTo(writtenBytes));
Expand All @@ -288,6 +295,7 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep
class Encrypted {
private Cipher decryptCipher;
private Cipher encryptCipher;
private String aesKey;

@BeforeEach
void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException {
Expand All @@ -300,7 +308,7 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding
encryptCipher = Cipher.getInstance("AES");
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded());
final String aesKey = new String(base64Bytes);
aesKey = new String(base64Bytes);

final Map<String, Object> topicConfigMap = objectMapper.convertValue(topicConfig, Map.class);
topicConfigMap.put("encryption_key", aesKey);
Expand Down Expand Up @@ -360,6 +368,10 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T
final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(valueBytes);

assertThat(bufferData, notNullValue());
assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES));
assertThat(bufferData.getEncrypted(), equalTo(true));
assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty()));

byte[] innerData = bufferData.getData().toByteArray();

assertThat(innerData, notNullValue());
Expand Down Expand Up @@ -402,6 +414,10 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr
final KafkaBufferMessage.BufferData bufferData = KafkaBufferMessage.BufferData.parseFrom(valueBytes);

assertThat(bufferData, notNullValue());
assertThat(bufferData.getMessageFormat(), equalTo(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES));
assertThat(bufferData.getEncryptedDataKey(), notNullValue());
assertThat(bufferData.getEncryptedDataKey(), equalTo(ByteString.empty()));

final byte[] innerData = bufferData.getData().toByteArray();

assertThat(innerData, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.google.protobuf.ByteString;
import org.apache.kafka.common.serialization.Serializer;
import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferMessage;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;

import java.util.Objects;

/**
* A Kafka {@link Serializer} which serializes data into a KafkaBuffer Protobuf message.
Expand All @@ -16,9 +19,11 @@
*/
class BufferMessageSerializer<T> implements Serializer<T> {
private final Serializer<T> dataSerializer;
private final KafkaDataConfig dataConfig;

public BufferMessageSerializer(final Serializer<T> dataSerializer) {
this.dataSerializer = dataSerializer;
public BufferMessageSerializer(final Serializer<T> dataSerializer, final KafkaDataConfig dataConfig) {
this.dataSerializer = Objects.requireNonNull(dataSerializer);
this.dataConfig = Objects.requireNonNull(dataConfig);
}

@Override
Expand All @@ -28,15 +33,27 @@ public byte[] serialize(final String topic, final T data) {

final byte[] serializedData = dataSerializer.serialize(topic, data);

final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder()
.setData(ByteString.copyFrom(serializedData))
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.build();
final KafkaBufferMessage.BufferData bufferedData = buildProtobufMessage(serializedData);

return bufferedData.toByteArray();
}

Serializer<T> getDataSerializer() {
return dataSerializer;
}

private KafkaBufferMessage.BufferData buildProtobufMessage(final byte[] serializedData) {
KafkaBufferMessage.BufferData.Builder messageBuilder = KafkaBufferMessage.BufferData.newBuilder()
.setData(ByteString.copyFrom(serializedData))
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES);

if(dataConfig.getEncryptionKeySupplier() != null) {
messageBuilder = messageBuilder.setEncrypted(true);

if(dataConfig.getEncryptedDataKey() != null)
messageBuilder = messageBuilder.setEncryptedDataKey(ByteString.copyFromUtf8(dataConfig.getEncryptedDataKey()));
}

return messageBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public Deserializer<?> getDeserializer(final KafkaDataConfig dataConfig) {

@Override
public Serializer<?> getSerializer(final KafkaDataConfig dataConfig) {
return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig));
return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig), dataConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@
public interface KafkaDataConfig {
MessageFormat getSerdeFormat();
Supplier<byte[]> getEncryptionKeySupplier();

/**
* Returns an encrypted data key. If the encryption key is not encrypted,
* then this will return <code>null</code>.
*
* @return The encrypted data key provided in the configuration.
*/
String getEncryptedDataKey();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KafkaDataConfigAdapter implements KafkaDataConfig {
private final KeyFactory keyFactory;
private final TopicConfig topicConfig;

public KafkaDataConfigAdapter(KeyFactory keyFactory, TopicConfig topicConfig) {
public KafkaDataConfigAdapter(final KeyFactory keyFactory, final TopicConfig topicConfig) {
this.keyFactory = keyFactory;
this.topicConfig = topicConfig;
}
Expand All @@ -35,4 +35,11 @@ public Supplier<byte[]> getEncryptionKeySupplier() {
return null;
return keyFactory.getKeySupplier(topicConfig);
}

@Override
public String getEncryptedDataKey() {
if(topicConfig.getEncryptionKey() == null)
return null;
return keyFactory.getEncryptedDataKey(topicConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class PlaintextKafkaDataConfig implements KafkaDataConfig {
private final KafkaDataConfig dataConfig;

private PlaintextKafkaDataConfig(KafkaDataConfig dataConfig) {
private PlaintextKafkaDataConfig(final KafkaDataConfig dataConfig) {
this.dataConfig = dataConfig;
}

Expand All @@ -31,4 +31,9 @@ public MessageFormat getSerdeFormat() {
public Supplier<byte[]> getEncryptionKeySupplier() {
return dataConfig.getEncryptionKeySupplier();
}

@Override
public String getEncryptedDataKey() {
return dataConfig.getEncryptedDataKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,13 @@

interface InnerKeyProvider extends Function<TopicConfig, byte[]> {
boolean supportsConfiguration(TopicConfig topicConfig);

/**
* Returns true if this {@link InnerKeyProvider} expects an encrypted
* data key. Returns false if the encryption key is in plain-text
* in the pipeline configuration.
*
* @return True if the key is encrypted.
*/
boolean isKeyEncrypted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,43 @@
public class KeyFactory {
private final List<InnerKeyProvider> orderedKeyProviders;

public KeyFactory(AwsContext awsContext) {
public KeyFactory(final AwsContext awsContext) {
this(List.of(
new KmsKeyProvider(awsContext),
new UnencryptedKeyProvider()
));
}

KeyFactory(List<InnerKeyProvider> orderedKeyProviders) {
KeyFactory(final List<InnerKeyProvider> orderedKeyProviders) {
this.orderedKeyProviders = orderedKeyProviders;
}

public Supplier<byte[]> getKeySupplier(TopicConfig topicConfig) {
public Supplier<byte[]> getKeySupplier(final TopicConfig topicConfig) {
if (topicConfig.getEncryptionKey() == null)
return null;

InnerKeyProvider keyProvider = orderedKeyProviders
final InnerKeyProvider keyProvider = getInnerKeyProvider(topicConfig);

return () -> keyProvider.apply(topicConfig);
}

public String getEncryptedDataKey(final TopicConfig topicConfig) {
if (topicConfig.getEncryptionKey() == null)
return null;

final InnerKeyProvider keyProvider = getInnerKeyProvider(topicConfig);

if(keyProvider.isKeyEncrypted())
return topicConfig.getEncryptionKey();

return null;
}

private InnerKeyProvider getInnerKeyProvider(final TopicConfig topicConfig) {
return orderedKeyProviders
.stream()
.filter(innerKeyProvider -> innerKeyProvider.supportsConfiguration(topicConfig))
.findFirst()
.orElseThrow(() -> new RuntimeException("Unable to find an inner key provider. This is a programming error - UnencryptedKeyProvider should always work."));

return () -> keyProvider.apply(topicConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,34 @@
class KmsKeyProvider implements InnerKeyProvider {
private final AwsContext awsContext;

public KmsKeyProvider(AwsContext awsContext) {
public KmsKeyProvider(final AwsContext awsContext) {
this.awsContext = awsContext;
}

@Override
public boolean supportsConfiguration(TopicConfig topicConfig) {
public boolean supportsConfiguration(final TopicConfig topicConfig) {
return topicConfig.getKmsConfig() != null && topicConfig.getKmsConfig().getKeyId() != null;
}

@Override
public byte[] apply(TopicConfig topicConfig) {
KmsConfig kmsConfig = topicConfig.getKmsConfig();
String kmsKeyId = kmsConfig.getKeyId();
public boolean isKeyEncrypted() {
return true;
}

@Override
public byte[] apply(final TopicConfig topicConfig) {
final KmsConfig kmsConfig = topicConfig.getKmsConfig();
final String kmsKeyId = kmsConfig.getKeyId();

AwsCredentialsProvider awsCredentialsProvider = awsContext.getOrDefault(kmsConfig);
final AwsCredentialsProvider awsCredentialsProvider = awsContext.getOrDefault(kmsConfig);

KmsClient kmsClient = KmsClient.builder()
final KmsClient kmsClient = KmsClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(awsContext.getRegionOrDefault(kmsConfig))
.build();

byte[] decodedEncryptionKey = Base64.getDecoder().decode(topicConfig.getEncryptionKey());
DecryptResponse decryptResponse = kmsClient.decrypt(builder -> builder
final byte[] decodedEncryptionKey = Base64.getDecoder().decode(topicConfig.getEncryptionKey());
final DecryptResponse decryptResponse = kmsClient.decrypt(builder -> builder
.keyId(kmsKeyId)
.ciphertextBlob(SdkBytes.fromByteArray(decodedEncryptionKey))
.encryptionContext(kmsConfig.getEncryptionContext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@

class UnencryptedKeyProvider implements InnerKeyProvider {
@Override
public boolean supportsConfiguration(TopicConfig topicConfig) {
public boolean supportsConfiguration(final TopicConfig topicConfig) {
return true;
}

@Override
public byte[] apply(TopicConfig topicConfig) {
public boolean isKeyEncrypted() {
return false;
}

@Override
public byte[] apply(final TopicConfig topicConfig) {
return Base64.getDecoder().decode(topicConfig.getEncryptionKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,13 @@ message BufferData {
* is unencrypted data.
*/
bytes data = 2;

/* Indicates if data is encrypted or not.
*/
optional bool encrypted = 3;

/* The data key which encrypted the data field. This will be encrypted.
* The consuming Data Prepper node must have the ability to decrypt this key.
*/
optional bytes encrypted_data_key = 4;
}
Loading

0 comments on commit f9f8d1d

Please sign in to comment.