Skip to content

Commit

Permalink
Rename s3 sink object metadata config options (#5041)
Browse files Browse the repository at this point in the history
* Addressed review comments. Introduced a new config, will deprecate the old config

Signed-off-by: Kondaka <[email protected]>

* Addressed review comments. Introduced a new config for metadata

Signed-off-by: Kondaka <[email protected]>

* Addressed review comments. Created a separate class for object metadata

Signed-off-by: Kondaka <[email protected]>

* Addressed review comments.

Signed-off-by: Kondaka <[email protected]>

* Fixed indentation

Signed-off-by: Kondaka <[email protected]>

---------

Signed-off-by: Kondaka <[email protected]>
(cherry picked from commit 8216fdc)
  • Loading branch information
kkondaka authored and github-actions[bot] committed Oct 14, 2024
1 parent d20d445 commit ff1a621
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import java.util.HashMap;
import java.util.Map;

public class ObjectMetadata {
private Map<String, String> metadata;
private ObjectMetadataConfig objectMetadataConfig;
private PredefinedObjectMetadata predefinedObjectMetadata;

public ObjectMetadata(final Object objectMetadataConfig) {
if (objectMetadataConfig instanceof ObjectMetadataConfig) {
this.objectMetadataConfig = (ObjectMetadataConfig)objectMetadataConfig;
} else { // instanceof PredefinedObjectMetadata
this.predefinedObjectMetadata = (PredefinedObjectMetadata)objectMetadataConfig;
}
this.metadata = new HashMap<String, String>();
}

public void setEventCount(final int eventCount) {
String numberOfEventsKey = null;
if (objectMetadataConfig != null) {
numberOfEventsKey = objectMetadataConfig.getNumberOfEventsKey();
} else if (predefinedObjectMetadata != null) {
numberOfEventsKey = predefinedObjectMetadata.getNumberOfObjects();
}
if (numberOfEventsKey != null) {
metadata.put(numberOfEventsKey, Integer.toString(eventCount));
}
}

public Map<String, String> get() {
return metadata;
}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.annotation.JsonProperty;

public class ObjectMetadataConfig {
@JsonProperty("number_of_events_key")
private String numberOfEventsKey;

public String getNumberOfEventsKey() {
return numberOfEventsKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public class S3SinkConfig {
@JsonProperty("predefined_object_metadata")
private PredefinedObjectMetadata predefinedObjectMetadata;

@JsonProperty("object_metadata")
private ObjectMetadataConfig objectMetadataConfig;

@AssertTrue(message = "Only one of object_metadata and predefined_object_metadata can be used.")
private boolean isValidMetadataConfig() {
return (objectMetadataConfig != null && predefinedObjectMetadata == null) ||
(objectMetadataConfig == null && predefinedObjectMetadata != null);
}

@AssertTrue(message = "You may not use both bucket and bucket_selector together in one S3 sink.")
private boolean isValidBucketConfig() {
return (bucketName != null && bucketSelector == null) ||
Expand Down Expand Up @@ -142,8 +151,8 @@ public ObjectKeyOptions getObjectKeyOptions() {
return objectKeyOptions;
}

public PredefinedObjectMetadata getPredefinedObjectMetadata() {
return predefinedObjectMetadata;
public ObjectMetadataConfig getObjectMetadataConfig() {
return objectMetadataConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@

package org.opensearch.dataprepper.plugins.sink.s3.grouping;

import org.opensearch.dataprepper.plugins.sink.s3.PredefinedObjectMetadata;
import org.opensearch.dataprepper.plugins.sink.s3.ObjectMetadata;
import java.util.Map;
import java.util.Objects;

class S3GroupIdentifier {
private final Map<String, Object> groupIdentifierHash;
private final String groupIdentifierFullObjectKey;

private final PredefinedObjectMetadata predefinedObjectMetadata;
private final Object objectMetadataConfig;
private final String fullBucketName;

public S3GroupIdentifier(final Map<String, Object> groupIdentifierHash,
final String groupIdentifierFullObjectKey,
final PredefinedObjectMetadata predefineObjectMetadata,
final Object objectMetadataConfig,
final String fullBucketName) {
this.groupIdentifierHash = groupIdentifierHash;
this.groupIdentifierFullObjectKey = groupIdentifierFullObjectKey;
this.predefinedObjectMetadata = predefineObjectMetadata;
this.objectMetadataConfig = objectMetadataConfig;
this.fullBucketName = fullBucketName;
}

Expand All @@ -43,6 +43,13 @@ public int hashCode() {

public Map<String, Object> getGroupIdentifierHash() { return groupIdentifierHash; }

public Map<String, String> getMetadata(int eventCount) { return predefinedObjectMetadata != null ? Map.of(predefinedObjectMetadata.getNumberOfObjects(), Integer.toString(eventCount)) : null; }
public Map<String, String> getMetadata(int eventCount) {
if (objectMetadataConfig == null) {
return null;
}
ObjectMetadata objectMetadata = new ObjectMetadata(objectMetadataConfig);
objectMetadata.setEventCount(eventCount);
return objectMetadata.get();
}
public String getFullBucketName() { return fullBucketName; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) {
}


return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, s3SinkConfig.getPredefinedObjectMetadata(), fullBucketName);
return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, s3SinkConfig.getObjectMetadataConfig(), fullBucketName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.junit.jupiter.api.Test;

import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.UUID;

public class ObjectMetadataConfigTest {
@Test
void test_default() {
assertThat(new ObjectMetadataConfig().getNumberOfEventsKey(), equalTo(null));
}

@Test
void test_number_of_events_key() throws Exception {
final String numberOfEventsKey = UUID.randomUUID().toString();
final ObjectMetadataConfig objectUnderTest = new ObjectMetadataConfig();
ReflectivelySetField.setField(ObjectMetadataConfig.class, objectUnderTest, "numberOfEventsKey", numberOfEventsKey);
assertThat(objectUnderTest.getNumberOfEventsKey(), equalTo(numberOfEventsKey));
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import static org.hamcrest.CoreMatchers.equalTo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import org.mockito.Mock;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Random;
import java.util.UUID;

public class ObjectMetadataTest {
private ObjectMetadata objectMetadata;
@Mock
private ObjectMetadataConfig objectMetadataConfig;
@Mock
private PredefinedObjectMetadata predefinedObjectMetadata;
private String numberOfEventsKey;

private ObjectMetadata createObjectUnderTest(Object metadataConfig) {
return new ObjectMetadata(metadataConfig);
}

@BeforeEach
public void setup() {
objectMetadataConfig = mock(ObjectMetadataConfig.class);
predefinedObjectMetadata = mock(PredefinedObjectMetadata.class);
numberOfEventsKey = UUID.randomUUID().toString();
when(objectMetadataConfig.getNumberOfEventsKey()).thenReturn(numberOfEventsKey);
when(predefinedObjectMetadata.getNumberOfObjects()).thenReturn(numberOfEventsKey);
}

@Test
public void test_setEventCount_with_PredefinedObjectMetadata() {
objectMetadata = createObjectUnderTest(predefinedObjectMetadata);
Random random = new Random();
Integer numEvents = Math.abs(random.nextInt());
objectMetadata.setEventCount(numEvents);
assertThat(objectMetadata.get().get(numberOfEventsKey), equalTo(Integer.toString(numEvents)));
}

@Test
void test_setEventCount_with_ObjectMetadata() {
objectMetadata = createObjectUnderTest(objectMetadataConfig);
Random random = new Random();
Integer numEvents = Math.abs(random.nextInt());
objectMetadata.setEventCount(numEvents);
assertThat(objectMetadata.get().get(numberOfEventsKey), equalTo(Integer.toString(numEvents)));
}

}

0 comments on commit ff1a621

Please sign in to comment.