Skip to content

Commit

Permalink
Fixed build errors
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Nov 1, 2023
1 parent 53b1787 commit df44704
Showing 1 changed file with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -137,6 +138,10 @@ void setUp() {
lenient().when(pluginMetrics.summary(S3SinkService.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary);
}

private DefaultEventHandle castToDefaultHandle(EventHandle eventHandle) {
return (DefaultEventHandle)eventHandle;
}

private S3SinkService createObjectUnderTest() {
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics);
}
Expand Down Expand Up @@ -380,8 +385,8 @@ void output_will_release_all_handles_since_a_flush() throws IOException {
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
for (EventHandle eventHandle : eventHandles) {
final List<DefaultEventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());
for (DefaultEventHandle eventHandle : eventHandles) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records);
Expand All @@ -406,8 +411,8 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx
doNothing().when(codec).writeEvent(event1, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
for (EventHandle eventHandle : eventHandles) {
final List<DefaultEventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());
for (DefaultEventHandle eventHandle : eventHandles) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}

Expand All @@ -417,9 +422,9 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx
}

final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
final List<DefaultEventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
for (DefaultEventHandle eventHandle : eventHandles2) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}

Expand Down Expand Up @@ -447,9 +452,9 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
final List<DefaultEventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
for (DefaultEventHandle eventHandle : eventHandles) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records);
Expand All @@ -473,17 +478,17 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException {
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
for (EventHandle eventHandle : eventHandles) {
final List<DefaultEventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());
for (DefaultEventHandle eventHandle : eventHandles) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records);
for (EventHandle eventHandle : eventHandles) {
verify(acknowledgementSet).release(eventHandle, true);
}
final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
for (EventHandle eventHandle : eventHandles2) {
final List<DefaultEventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());
for (DefaultEventHandle eventHandle : eventHandles2) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records2);
Expand All @@ -509,8 +514,8 @@ void output_will_skip_and_drop_failed_records() throws IOException {
List<Record<Event>> records = generateEventRecords(2);
Event event1 = records.get(0).getData();
Event event2 = records.get(1).getData();
EventHandle eventHandle1 = event1.getEventHandle();
EventHandle eventHandle2 = event2.getEventHandle();
DefaultEventHandle eventHandle1 = (DefaultEventHandle)event1.getEventHandle();
DefaultEventHandle eventHandle2 = (DefaultEventHandle)event2.getEventHandle();
eventHandle1.setAcknowledgementSet(acknowledgementSet);
eventHandle2.setAcknowledgementSet(acknowledgementSet);

Expand Down Expand Up @@ -545,8 +550,8 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
for (EventHandle eventHandle : eventHandles) {
final List<DefaultEventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());
for (DefaultEventHandle eventHandle : eventHandles) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records);
Expand All @@ -555,9 +560,9 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I
}

final List<Record<Event>> records2 = generateEventRecords(1);
final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());
final List<DefaultEventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
for (DefaultEventHandle eventHandle : eventHandles2) {
eventHandle.setAcknowledgementSet(acknowledgementSet);
}
s3SinkService.output(records2);
Expand Down

0 comments on commit df44704

Please sign in to comment.