From abfe31983af0f006c670f1cf558cf6d0e29acee6 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 13 Nov 2023 16:54:23 -0600 Subject: [PATCH] MAINT: add bytes metrics into opensearch source (#3646) * MAINT: add bytes metrics Signed-off-by: George Chen --- .../source/opensearch/OpenSearchService.java | 8 +- .../OpenSearchSourcePluginMetrics.java | 16 ++++ .../worker/NoSearchContextWorker.java | 21 +++-- .../source/opensearch/worker/PitWorker.java | 9 +- .../opensearch/worker/ScrollWorker.java | 9 +- .../worker/NoSearchContextWorkerTest.java | 59 +++++++++++-- .../opensearch/worker/PitWorkerTest.java | 81 +++++++++++++++-- .../opensearch/worker/ScrollWorkerTest.java | 87 +++++++++++++++++-- 8 files changed, 259 insertions(+), 31 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java index 6b512324f1..df208bc090 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -32,6 +33,7 @@ public class OpenSearchService { private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30); static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30); @@ -83,13 +85,13 @@ private OpenSearchService(final SearchAccessor searchAccessor, public void start() { switch(searchAccessor.getSearchContextType()) { case POINT_IN_TIME: - searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + searchWorker = new PitWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case SCROLL: - searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + searchWorker = new ScrollWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case NONE: - searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + searchWorker = new NoSearchContextWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; default: throw new IllegalArgumentException( diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java index 80bc2d22a8..2064ddbae6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.opensearch.metrics; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -15,12 +16,17 @@ public class OpenSearchSourcePluginMetrics { static final String INDICES_PROCESSED = "indicesProcessed"; static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime"; static final String PROCESSING_ERRORS = "processingErrors"; + static final String BYTES_RECEIVED = "bytesReceived"; + static final String BYTES_PROCESSED = "bytesProcessed"; private final Counter documentsProcessedCounter; private final Counter indicesProcessedCounter; private final Counter processingErrorsCounter; private final Timer indexProcessingTimeTimer; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; + public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) { return new OpenSearchSourcePluginMetrics(pluginMetrics); } @@ -30,6 +36,8 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) { indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED); processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS); indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED); + bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); } public Counter getDocumentsProcessedCounter() { @@ -47,4 +55,12 @@ public Counter getProcessingErrorsCounter() { public Timer getIndexProcessingTimeTimer() { return indexProcessingTimeTimer; } + + public DistributionSummary getBytesReceivedSummary() { + return bytesReceivedSummary; + } + + public DistributionSummary getBytesProcessedSummary() { + return bytesProcessedSummary; + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index b46f50ab3e..52b0aa91d0 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -42,6 +43,7 @@ public class NoSearchContextWorker implements SearchWorker, Runnable { private static final Logger LOG = LoggerFactory.getLogger(NoSearchContextWorker.class); + private final ObjectMapper objectMapper; private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; @@ -52,13 +54,15 @@ public class NoSearchContextWorker implements SearchWorker, Runnable { private int noAvailableIndicesCount = 0; - public NoSearchContextWorker(final SearchAccessor searchAccessor, - final OpenSearchSourceConfiguration openSearchSourceConfiguration, - final SourceCoordinator sourceCoordinator, - final BufferAccumulator> bufferAccumulator, - final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager, - final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + public NoSearchContextWorker(final ObjectMapper objectMapper, + final SearchAccessor searchAccessor, + final OpenSearchSourceConfiguration openSearchSourceConfiguration, + final SourceCoordinator sourceCoordinator, + final BufferAccumulator> bufferAccumulator, + final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + this.objectMapper = objectMapper; this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; @@ -154,11 +158,14 @@ private void processIndex(final SourcePartition op searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> { try { + final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length; + openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes); if (Objects.nonNull(acknowledgementSet)) { acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); + openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes); } catch (Exception e) { openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index bc9f531d95..b6a4256d7d 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -55,6 +56,7 @@ public class PitWorker implements SearchWorker, Runnable { static final String EXTEND_KEEP_ALIVE_TIME = "1m"; private static final Duration EXTEND_KEEP_ALIVE_DURATION = Duration.ofMinutes(1); + private final ObjectMapper objectMapper; private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; @@ -66,13 +68,15 @@ public class PitWorker implements SearchWorker, Runnable { private int noAvailableIndicesCount = 0; - public PitWorker(final SearchAccessor searchAccessor, + public PitWorker(final ObjectMapper objectMapper, + final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, final AcknowledgementSetManager acknowledgementSetManager, final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + this.objectMapper = objectMapper; this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; @@ -191,11 +195,14 @@ private void processIndex(final SourcePartition op searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> { try { + final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length; + openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes); if (Objects.nonNull(acknowledgementSet)) { acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); + openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes); } catch (Exception e) { openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index be12d40dc6..b063219239 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -50,6 +51,7 @@ public class ScrollWorker implements SearchWorker { private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120); static final String SCROLL_TIME_PER_BATCH = "1m"; + private final ObjectMapper objectMapper; private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; @@ -60,13 +62,15 @@ public class ScrollWorker implements SearchWorker { private int noAvailableIndicesCount = 0; - public ScrollWorker(final SearchAccessor searchAccessor, + public ScrollWorker(final ObjectMapper objectMapper, + final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, final AcknowledgementSetManager acknowledgementSetManager, final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + this.objectMapper = objectMapper; this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.sourceCoordinator = sourceCoordinator; @@ -198,11 +202,14 @@ private void writeDocumentsToBuffer(final List documents, final AcknowledgementSet acknowledgementSet) { documents.stream().map(Record::new).forEach(record -> { try { + final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length; + openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes); if (Objects.nonNull(acknowledgementSet)) { acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); + openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes); } catch (Exception e) { openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index e57d40f266..b5d0d4ff28 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -5,7 +5,10 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,6 +66,8 @@ @ExtendWith(MockitoExtension.class) public class NoSearchContextWorkerTest { + @Mock + private ObjectMapper objectMapper; @Mock private OpenSearchSourceConfiguration openSearchSourceConfiguration; @@ -97,6 +102,12 @@ public class NoSearchContextWorkerTest { @Mock private Timer indexProcessingTimeTimer; + @Mock + private DistributionSummary bytesReceivedSummary; + + @Mock + private DistributionSummary bytesProcessedSummary; + private ExecutorService executorService; @BeforeEach @@ -107,10 +118,12 @@ void setup() { when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); + when(openSearchSourcePluginMetrics.getBytesReceivedSummary()).thenReturn(bytesReceivedSummary); + when(openSearchSourcePluginMetrics.getBytesProcessedSummary()).thenReturn(bytesProcessedSummary); } private NoSearchContextWorker createObjectUnderTest() { - return new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + return new NoSearchContextWorker(objectMapper, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -177,8 +190,20 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(NoSearchContextSearchRequest.class); when(searchAccessor.searchWithoutSearchContext(searchRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); @@ -220,7 +245,13 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha assertThat(noSearchContextSearchRequests.get(1).getPaginationSize(), equalTo(2)); assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); verify(documentsProcessedCounter, times(3)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } @@ -254,8 +285,20 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(NoSearchContextSearchRequest.class); when(searchAccessor.searchWithoutSearchContext(searchRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); @@ -299,7 +342,13 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa verify(acknowledgementSet).complete(); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); verify(documentsProcessedCounter, times(3)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index a333925137..f53a23178b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -5,7 +5,10 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,6 +72,8 @@ @ExtendWith(MockitoExtension.class) public class PitWorkerTest { + @Mock + private ObjectMapper objectMapper; @Mock private OpenSearchSourceConfiguration openSearchSourceConfiguration; @@ -103,6 +108,12 @@ public class PitWorkerTest { @Mock private Timer indexProcessingTimeTimer; + @Mock + private DistributionSummary bytesReceivedSummary; + + @Mock + private DistributionSummary bytesProcessedSummary; + private ExecutorService executorService; @BeforeEach @@ -113,10 +124,12 @@ void setup() { when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); + when(openSearchSourcePluginMetrics.getBytesReceivedSummary()).thenReturn(bytesReceivedSummary); + when(openSearchSourcePluginMetrics.getBytesProcessedSummary()).thenReturn(bytesProcessedSummary); } private PitWorker createObjectUnderTest() { - return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + return new PitWorker(objectMapper, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -154,8 +167,20 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); final ArgumentCaptor searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class); when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); @@ -214,7 +239,13 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ verifyNoInteractions(acknowledgementSetManager); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); verify(documentsProcessedCounter, times(3)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } @@ -254,8 +285,20 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); final ArgumentCaptor searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class); when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); @@ -315,7 +358,13 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa verify(acknowledgementSet).complete(); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); verify(documentsProcessedCounter, times(3)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } @@ -342,8 +391,20 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); when(searchAccessor.searchWithPit(any(SearchPointInTimeRequest.class))).thenReturn(searchWithSearchAfterResults); @@ -381,7 +442,13 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class)); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); verify(documentsProcessedCounter, times(3)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index ffd24b5972..edd479b80c 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -5,7 +5,10 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -68,6 +71,8 @@ @ExtendWith(MockitoExtension.class) public class ScrollWorkerTest { + @Mock + private ObjectMapper objectMapper; @Mock private OpenSearchSourceConfiguration openSearchSourceConfiguration; @@ -102,6 +107,12 @@ public class ScrollWorkerTest { @Mock private Timer indexProcessingTimeTimer; + @Mock + private DistributionSummary bytesReceivedSummary; + + @Mock + private DistributionSummary bytesProcessedSummary; + private ExecutorService executorService; @BeforeEach @@ -112,10 +123,12 @@ void setup() { when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); + when(openSearchSourcePluginMetrics.getBytesReceivedSummary()).thenReturn(bytesReceivedSummary); + when(openSearchSourcePluginMetrics.getBytesProcessedSummary()).thenReturn(bytesProcessedSummary); } private ScrollWorker createObjectUnderTest() { - return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); + return new ScrollWorker(objectMapper, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -144,7 +157,15 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro final ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(CreateScrollRequest.class); final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); when(createScrollResponse.getScrollId()).thenReturn(scrollId); - when(createScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)); when(searchAccessor.createScroll(requestArgumentCaptor.capture())).thenReturn(createScrollResponse); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); @@ -153,8 +174,20 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro final SearchScrollResponse searchScrollResponse = mock(SearchScrollResponse.class); when(searchScrollResponse.getScrollId()).thenReturn(scrollId); - when(searchScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent3 = mock(Event.class); + final Event testEvent4 = mock(Event.class); + final Event testEvent5 = mock(Event.class); + final JsonNode testData3 = mock(JsonNode.class); + final JsonNode testData4 = mock(JsonNode.class); + final JsonNode testData5 = mock(JsonNode.class); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(testEvent4.getJsonNode()).thenReturn(testData4); + when(testEvent5.getJsonNode()).thenReturn(testData5); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(objectMapper.writeValueAsBytes(testData4)).thenReturn(new byte[40]); + when(objectMapper.writeValueAsBytes(testData5)).thenReturn(new byte[50]); + when(searchScrollResponse.getDocuments()).thenReturn(List.of(testEvent3, testEvent4)) + .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)); final ArgumentCaptor searchScrollRequestArgumentCaptor = ArgumentCaptor.forClass(SearchScrollRequest.class); when(searchAccessor.searchWithScroll(searchScrollRequestArgumentCaptor.capture())).thenReturn(searchScrollResponse); @@ -208,7 +241,17 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro assertThat(deleteScrollRequest, notNullValue()); assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); + verify(bytesReceivedSummary).record(40L); + verify(bytesReceivedSummary).record(50L); verify(documentsProcessedCounter, times(5)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); + verify(bytesProcessedSummary).record(40L); + verify(bytesProcessedSummary).record(50L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); } @@ -239,7 +282,15 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a final ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(CreateScrollRequest.class); final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); when(createScrollResponse.getScrollId()).thenReturn(scrollId); - when(createScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))); + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)); when(searchAccessor.createScroll(requestArgumentCaptor.capture())).thenReturn(createScrollResponse); final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); @@ -248,8 +299,20 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a final SearchScrollResponse searchScrollResponse = mock(SearchScrollResponse.class); when(searchScrollResponse.getScrollId()).thenReturn(scrollId); - when(searchScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))) - .thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + final Event testEvent3 = mock(Event.class); + final Event testEvent4 = mock(Event.class); + final Event testEvent5 = mock(Event.class); + final JsonNode testData3 = mock(JsonNode.class); + final JsonNode testData4 = mock(JsonNode.class); + final JsonNode testData5 = mock(JsonNode.class); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(testEvent4.getJsonNode()).thenReturn(testData4); + when(testEvent5.getJsonNode()).thenReturn(testData5); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(objectMapper.writeValueAsBytes(testData4)).thenReturn(new byte[40]); + when(objectMapper.writeValueAsBytes(testData5)).thenReturn(new byte[50]); + when(searchScrollResponse.getDocuments()).thenReturn(List.of(testEvent3, testEvent4)) + .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)); final ArgumentCaptor searchScrollRequestArgumentCaptor = ArgumentCaptor.forClass(SearchScrollRequest.class); when(searchAccessor.searchWithScroll(searchScrollRequestArgumentCaptor.capture())).thenReturn(searchScrollResponse); @@ -305,7 +368,17 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a verify(acknowledgementSet).complete(); + verify(bytesReceivedSummary).record(10L); + verify(bytesReceivedSummary).record(20L); + verify(bytesReceivedSummary).record(30L); + verify(bytesReceivedSummary).record(40L); + verify(bytesReceivedSummary).record(50L); verify(documentsProcessedCounter, times(5)).increment(); + verify(bytesProcessedSummary).record(10L); + verify(bytesProcessedSummary).record(20L); + verify(bytesProcessedSummary).record(30L); + verify(bytesProcessedSummary).record(40L); + verify(bytesProcessedSummary).record(50L); verify(indicesProcessedCounter).increment(); verifyNoInteractions(processingErrorsCounter); }