Skip to content

Commit

Permalink
MAINT: add bytes metrics into opensearch source (#3646)
Browse files Browse the repository at this point in the history
* MAINT: add bytes metrics

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Nov 13, 2023
1 parent 0113be9 commit abfe319
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -32,6 +33,7 @@ public class OpenSearchService {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30);

Expand Down Expand Up @@ -83,13 +85,13 @@ private OpenSearchService(final SearchAccessor searchAccessor,
public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new PitWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new ScrollWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case NONE:
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new NoSearchContextWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.opensearch.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;

Expand All @@ -15,12 +16,17 @@ public class OpenSearchSourcePluginMetrics {
static final String INDICES_PROCESSED = "indicesProcessed";
static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime";
static final String PROCESSING_ERRORS = "processingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";

private final Counter documentsProcessedCounter;
private final Counter indicesProcessedCounter;
private final Counter processingErrorsCounter;
private final Timer indexProcessingTimeTimer;

private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
return new OpenSearchSourcePluginMetrics(pluginMetrics);
}
Expand All @@ -30,6 +36,8 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED);
processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS);
indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED);
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

public Counter getDocumentsProcessedCounter() {
Expand All @@ -47,4 +55,12 @@ public Counter getProcessingErrorsCounter() {
public Timer getIndexProcessingTimeTimer() {
return indexProcessingTimeTimer;
}

public DistributionSummary getBytesReceivedSummary() {
return bytesReceivedSummary;
}

public DistributionSummary getBytesProcessedSummary() {
return bytesProcessedSummary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(NoSearchContextWorker.class);

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -52,13 +54,15 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {

private int noAvailableIndicesCount = 0;

public NoSearchContextWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
public NoSearchContextWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand Down Expand Up @@ -154,11 +158,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class PitWorker implements SearchWorker, Runnable {
static final String EXTEND_KEEP_ALIVE_TIME = "1m";
private static final Duration EXTEND_KEEP_ALIVE_DURATION = Duration.ofMinutes(1);

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -66,13 +68,15 @@ public class PitWorker implements SearchWorker, Runnable {

private int noAvailableIndicesCount = 0;

public PitWorker(final SearchAccessor searchAccessor,
public PitWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand Down Expand Up @@ -191,11 +195,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class ScrollWorker implements SearchWorker {
private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120);
static final String SCROLL_TIME_PER_BATCH = "1m";

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -60,13 +62,15 @@ public class ScrollWorker implements SearchWorker {

private int noAvailableIndicesCount = 0;

public ScrollWorker(final SearchAccessor searchAccessor,
public ScrollWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
Expand Down Expand Up @@ -198,11 +202,14 @@ private void writeDocumentsToBuffer(final List<Event> documents,
final AcknowledgementSet acknowledgementSet) {
documents.stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Loading

0 comments on commit abfe319

Please sign in to comment.