Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into secrets-variable-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 authored Jan 17, 2025
2 parents 1d14234 + 41e3227 commit 8b946af
Show file tree
Hide file tree
Showing 34 changed files with 404 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @sb2k16 @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @san81 @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: ./gradlew --parallel --max-workers 2 build
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-test-results-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -45,7 +45,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-kafka-integration-tests-kafka-${{ matrix.kafka }}-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -83,7 +83,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/kinesis-source-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
-Dtests.kinesis.source.aws.region=us-east-1 --tests KinesisSourceIT
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-kinesis-source-integration-tests-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -72,7 +72,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opendistro:${{ matrix.opendistro }}
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-opensearch-integration-tests-opendistro-${{ matrix.opendistro }}-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -56,7 +56,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opensearch:${{ matrix.opensearch }}
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-opensearch-integration-tests-opensearch-${{ matrix.opensearch }}-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -56,7 +56,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ If you are modifying existing files with license headers, or including new files
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
*/
```

### Shell, Python
Expand Down
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ private List<Message> getMessagesFromSqs() {
final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
final List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
failedAttemptCount = 0;
if (messages.isEmpty()) {
sqsMessageDelayTimer.record(Duration.ZERO);
}
return messages;
} catch (final SqsException | StsException e) {
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
Expand Down Expand Up @@ -228,6 +231,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size());

for (ParsedMessage parsedMessage : parsedMessagesToRead) {
sqsMessageDelayTimer.record(Duration.between(
Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()),
Instant.now()
));
List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = new ArrayList<>();
AcknowledgementSet acknowledgementSet = null;
final int visibilityTimeout = (int)sqsOptions.getVisibilityTimeout().getSeconds();
Expand Down Expand Up @@ -318,10 +325,6 @@ private Optional<DeleteMessageBatchRequestEntry> processS3Object(
// SQS messages won't be deleted if we are unable to process S3Objects because of an exception
try {
s3Service.addS3Object(s3ObjectReference, acknowledgementSet);
sqsMessageDelayTimer.record(Duration.between(
Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()),
Instant.now()
));
return Optional.of(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()));
} catch (final Exception e) {
LOG.error("Error processing from S3: {}. Retrying with exponential backoff.", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down Expand Up @@ -67,6 +68,7 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -700,7 +702,10 @@ void processSqsMessages_should_stop_updating_visibility_timeout_after_stop() thr
objectUnderTest.stop();

assertThat(messagesProcessed, equalTo(1));
verify(s3Service).addS3Object(any(S3ObjectReference.class), any());

final InOrder inOrder = inOrder(s3Service, sqsMessageDelayTimer);
inOrder.verify(sqsMessageDelayTimer).record(any(Duration.class));
inOrder.verify(s3Service).addS3Object(any(S3ObjectReference.class), any());
verify(acknowledgementSetManager).create(any(), any(Duration.class));

ArgumentCaptor<Consumer<ProgressCheck>> progressConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
Expand All @@ -711,7 +716,17 @@ void processSqsMessages_should_stop_updating_visibility_timeout_after_stop() thr

verify(sqsClient, never()).changeMessageVisibility(any(ChangeMessageVisibilityRequest.class));
verify(sqsMessagesReceivedCounter).increment(1);
verify(sqsMessageDelayTimer).record(any(Duration.class));
}

@Test
void processSqsMessages_should_record_zero_message_delay_when_no_messages_are_found_on_poll() {
final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class);
when(receiveMessageResponse.messages()).thenReturn(Collections.emptyList());

when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse);
final int messagesProcessed = createObjectUnderTest().processSqsMessages();
assertThat(messagesProcessed, equalTo(0));
verify(sqsMessageDelayTimer).record(Duration.ZERO);
}

private static String createPutNotification(final Instant startTime) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import java.util.HashMap;
import java.util.Map;

public class AttributeHandler {
public static Map<String, String> collectMetadataAttributes(final Message message, final String queueUrl) {
final Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("queueUrl", queueUrl);

for (Map.Entry<MessageSystemAttributeName, String> entry : message.attributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
String originalKey = entry.getKey().toString();
String key = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);
metadataMap.put(key, entry.getValue().stringValue());
}
return metadataMap;
}

public static void applyMetadataAttributes(final Event event, final Map<String, String> attributes) {
final EventMetadata metadata = event.getMetadata();
attributes.forEach(metadata::setAttribute);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class CodecBulkMessageFieldStrategy implements MessageFieldStrategy {

private final InputCodec codec;

public CodecBulkMessageFieldStrategy(final InputCodec codec) {
this.codec = codec;
}

@Override
public List<Event> parseEvents(final String messageBody) {
final List<Event> events = new ArrayList<>();
final ByteArrayInputStream inputStream = new ByteArrayInputStream(messageBody.getBytes(StandardCharsets.UTF_8));
try {
codec.parse(inputStream, (Consumer<Record<Event>>) record -> events.add(record.getData()));
} catch (Exception e) {
throw new RuntimeException("Failed to parse events from SQS body.", e);
}
return events;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.model.event.Event;
import java.util.List;

public interface MessageFieldStrategy {
/**
* Converts the SQS message body into one or more events.
*/
List<Event> parseEvents(String messageBody);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;
Expand All @@ -15,6 +20,7 @@

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.configuration.PluginModel;

public class QueueConfig {

Expand Down Expand Up @@ -62,6 +68,9 @@ public class QueueConfig {
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

@JsonProperty("codec")
private PluginModel codec = null;

public String getUrl() {
return url;
}
Expand Down Expand Up @@ -93,5 +102,10 @@ public Duration getWaitTime() {
public Duration getPollDelay() {
return pollDelay;
}

public PluginModel getCodec() {
return codec;
}

}

Loading

0 comments on commit 8b946af

Please sign in to comment.