From 560cfc491129b53547374f293420a33a029de16f Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 8 Jan 2025 16:49:52 -0800 Subject: [PATCH] rename OpenSearchBulkRetryWrapper (remove Retry) Signed-off-by: Sean Kao --- .../core/RestHighLevelClientWrapper.java | 6 +-- ...rapper.java => OpenSearchBulkWrapper.java} | 11 ++--- .../core/storage/OpenSearchClientUtils.java | 2 +- ...st.java => OpenSearchBulkWrapperTest.java} | 46 +++++++++---------- 4 files changed, 30 insertions(+), 35 deletions(-) rename flint-core/src/main/scala/org/opensearch/flint/core/storage/{OpenSearchBulkRetryWrapper.java => OpenSearchBulkWrapper.java} (93%) rename flint-core/src/test/scala/org/opensearch/flint/core/storage/{OpenSearchBulkRetryWrapperTest.java => OpenSearchBulkWrapperTest.java} (88%) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index b9022f7d3..7eb607963 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -39,7 +39,7 @@ import org.opensearch.client.transport.rest_client.RestClientTransport; import java.io.IOException; -import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper; +import org.opensearch.flint.core.storage.OpenSearchBulkWrapper; import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX; @@ -53,7 +53,7 @@ */ public class RestHighLevelClientWrapper implements IRestHighLevelClient { private final RestHighLevelClient client; - private final OpenSearchBulkRetryWrapper bulkRetryWrapper; + private final OpenSearchBulkWrapper bulkRetryWrapper; private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper(); @@ -62,7 +62,7 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient { * * @param client the RestHighLevelClient instance to wrap */ - public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkRetryWrapper bulkRetryWrapper) { + public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkWrapper bulkRetryWrapper) { this.client = client; this.bulkRetryWrapper = bulkRetryWrapper; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java similarity index 93% rename from flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java rename to flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java index f95439536..5b137481a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java @@ -27,22 +27,19 @@ /** * Wrapper class for OpenSearch bulk API with retry and rate limiting capability. - * TODO: remove Retry from name (also rename variables) */ -public class OpenSearchBulkRetryWrapper { +public class OpenSearchBulkWrapper { - private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName()); + private static final Logger LOG = Logger.getLogger(OpenSearchBulkWrapper.class.getName()); private final RetryPolicy retryPolicy; private final BulkRequestRateLimiter rateLimiter; - public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) { + public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) { this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate); this.rateLimiter = rateLimiter; } - // TODO: need test case using bulk with rate limiter - /** * Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the * request if the response contains retryable failure. It won't retry when bulk call thrown @@ -60,8 +57,6 @@ public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, Re private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) { final AtomicInteger requestCount = new AtomicInteger(0); - // TODO: notice for metric each retry attempt counts, but rate limit doesn't restrict retries - // could appear weird in dashboards try { final AtomicReference nextRequest = new AtomicReference<>(bulkRequest); BulkResponse res = Failsafe diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index 9d419bee0..841725e71 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -69,7 +69,7 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options public static IRestHighLevelClient createClient(FlintOptions options) { return new RestHighLevelClientWrapper(createRestHighLevelClient(options), - new OpenSearchBulkRetryWrapper(options.getRetryOptions(), + new OpenSearchBulkWrapper(options.getRetryOptions(), BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options))); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java similarity index 88% rename from flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java rename to flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java index ac2a62fd1..a93607f93 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java @@ -35,7 +35,7 @@ import org.opensearch.rest.RestStatus; @ExtendWith(MockitoExtension.class) -class OpenSearchBulkRetryWrapperTest { +class OpenSearchBulkWrapperTest { private static final long ESTIMATED_SIZE_IN_BYTES = 1000L; @Mock @@ -83,14 +83,14 @@ class OpenSearchBulkRetryWrapperTest { public void withRetryWhenCallSucceed() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(bulkRequest, options)).thenReturn(successResponse); when(successResponse.hasFailures()).thenReturn(false); when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); assertEquals(response, successResponse); verify(client).bulk(bulkRequest, options); @@ -105,7 +105,7 @@ public void withRetryWhenCallSucceed() throws Exception { public void withRetryWhenCallConflict() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(conflictResponse); @@ -114,7 +114,7 @@ public void withRetryWhenCallConflict() throws Exception { when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); assertEquals(response, conflictResponse); verify(client).bulk(bulkRequest, options); @@ -129,7 +129,7 @@ public void withRetryWhenCallConflict() throws Exception { public void withRetryWhenCallFailOnce() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse) @@ -139,7 +139,7 @@ public void withRetryWhenCallFailOnce() throws Exception { when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); assertEquals(response, successResponse); verify(client, times(2)).bulk(any(), eq(options)); @@ -153,7 +153,7 @@ public void withRetryWhenCallFailOnce() throws Exception { public void withRetryWhenAllCallFail() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse); @@ -161,7 +161,7 @@ public void withRetryWhenAllCallFail() throws Exception { when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); mockFailureResponse(); - BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); assertEquals(response, failureResponse); verify(client, times(3)).bulk(any(), eq(options)); @@ -175,14 +175,14 @@ public void withRetryWhenAllCallFail() throws Exception { public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test")); when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); assertThrows(RuntimeException.class, - () -> bulkRetryWrapper.bulk(client, bulkRequest, options)); + () -> bulkWrapper.bulk(client, bulkRequest, options)); verify(client).bulk(bulkRequest, options); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); @@ -195,7 +195,7 @@ public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { public void withoutRetryWhenCallFail() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithoutRetry, rateLimiter); when(client.bulk(bulkRequest, options)) .thenReturn(failureResponse); @@ -203,7 +203,7 @@ public void withoutRetryWhenCallFail() throws Exception { when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); mockFailureResponse(); - BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); assertEquals(response, failureResponse); verify(client).bulk(bulkRequest, options); @@ -217,7 +217,7 @@ public void withoutRetryWhenCallFail() throws Exception { public void increaseRateLimitWhenCallSucceed() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(bulkRequest, options)).thenReturn(successResponse); when(successResponse.hasFailures()).thenReturn(false); @@ -226,15 +226,15 @@ public void increaseRateLimitWhenCallSucceed() throws Exception { assertEquals(rateLimiter.getRate(), 2); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); assertEquals(rateLimiter.getRate(), 3); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); assertEquals(rateLimiter.getRate(), 4); // Should not exceed max rate limit rateLimiter.setRate(20); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); assertEquals(rateLimiter.getRate(), 20); }); } @@ -243,7 +243,7 @@ public void increaseRateLimitWhenCallSucceed() throws Exception { public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse) @@ -255,7 +255,7 @@ public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception { rateLimiter.setRate(10); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); // Should decrease once then increase once assertEquals(rateLimiter.getRate(), 6); @@ -266,7 +266,7 @@ public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception { public void decreaseRateLimitWhenAllCallFail() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse) @@ -278,7 +278,7 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception { rateLimiter.setRate(20); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); // Should decrease three times assertEquals(rateLimiter.getRate(), 2.5); @@ -297,7 +297,7 @@ public void testRateLimitFailureThreshold() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold); - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse) @@ -309,7 +309,7 @@ public void testRateLimitFailureThreshold() throws Exception { rateLimiter.setRate(19); - bulkRetryWrapper.bulk(client, bulkRequest, options); + bulkWrapper.bulk(client, bulkRequest, options); // First request has 50% failure, not exceeding threshold, increasing rate // Second and third requests has 100% failure, decreasing rate