Skip to content

Commit

Permalink
rename OpenSearchBulkRetryWrapper (remove Retry)
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jan 9, 2025
1 parent 8e0d83c commit 560cfc4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;
import org.opensearch.flint.core.storage.OpenSearchBulkWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
Expand All @@ -53,7 +53,7 @@
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;
private final OpenSearchBulkRetryWrapper bulkRetryWrapper;
private final OpenSearchBulkWrapper bulkRetryWrapper;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();

Expand All @@ -62,7 +62,7 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient {
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkRetryWrapper bulkRetryWrapper) {
public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkWrapper bulkRetryWrapper) {
this.client = client;
this.bulkRetryWrapper = bulkRetryWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,19 @@

/**
* Wrapper class for OpenSearch bulk API with retry and rate limiting capability.
* TODO: remove Retry from name (also rename variables)
*/
public class OpenSearchBulkRetryWrapper {
public class OpenSearchBulkWrapper {

private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName());
private static final Logger LOG = Logger.getLogger(OpenSearchBulkWrapper.class.getName());

private final RetryPolicy<BulkResponse> retryPolicy;
private final BulkRequestRateLimiter rateLimiter;

public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate);
this.rateLimiter = rateLimiter;
}

// TODO: need test case using bulk with rate limiter

/**
* Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the
* request if the response contains retryable failure. It won't retry when bulk call thrown
Expand All @@ -60,8 +57,6 @@ public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, Re
private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
// TODO: notice for metric each retry attempt counts, but rate limit doesn't restrict retries
// could appear weird in dashboards
try {
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest);
BulkResponse res = Failsafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options),
new OpenSearchBulkRetryWrapper(options.getRetryOptions(),
new OpenSearchBulkWrapper(options.getRetryOptions(),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.opensearch.rest.RestStatus;

@ExtendWith(MockitoExtension.class)
class OpenSearchBulkRetryWrapperTest {
class OpenSearchBulkWrapperTest {

private static final long ESTIMATED_SIZE_IN_BYTES = 1000L;
@Mock
Expand Down Expand Up @@ -83,14 +83,14 @@ class OpenSearchBulkRetryWrapperTest {
public void withRetryWhenCallSucceed() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(bulkRequest, options)).thenReturn(successResponse);
when(successResponse.hasFailures()).thenReturn(false);
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);

BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options);
BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options);

assertEquals(response, successResponse);
verify(client).bulk(bulkRequest, options);
Expand All @@ -105,7 +105,7 @@ public void withRetryWhenCallSucceed() throws Exception {
public void withRetryWhenCallConflict() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(conflictResponse);
Expand All @@ -114,7 +114,7 @@ public void withRetryWhenCallConflict() throws Exception {
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);

BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options);
BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options);

assertEquals(response, conflictResponse);
verify(client).bulk(bulkRequest, options);
Expand All @@ -129,7 +129,7 @@ public void withRetryWhenCallConflict() throws Exception {
public void withRetryWhenCallFailOnce() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
Expand All @@ -139,7 +139,7 @@ public void withRetryWhenCallFailOnce() throws Exception {
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);

BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options);
BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options);

assertEquals(response, successResponse);
verify(client, times(2)).bulk(any(), eq(options));
Expand All @@ -153,15 +153,15 @@ public void withRetryWhenCallFailOnce() throws Exception {
public void withRetryWhenAllCallFail() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse);
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);
mockFailureResponse();

BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options);
BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options);

assertEquals(response, failureResponse);
verify(client, times(3)).bulk(any(), eq(options));
Expand All @@ -175,14 +175,14 @@ public void withRetryWhenAllCallFail() throws Exception {
public void withRetryWhenCallThrowsShouldNotRetry() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test"));
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);

assertThrows(RuntimeException.class,
() -> bulkRetryWrapper.bulk(client, bulkRequest, options));
() -> bulkWrapper.bulk(client, bulkRequest, options));

verify(client).bulk(bulkRequest, options);
verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES);
Expand All @@ -195,15 +195,15 @@ public void withRetryWhenCallThrowsShouldNotRetry() throws Exception {
public void withoutRetryWhenCallFail() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithoutRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithoutRetry, rateLimiter);
when(client.bulk(bulkRequest, options))
.thenReturn(failureResponse);
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);
mockFailureResponse();

BulkResponse response = bulkRetryWrapper.bulk(client, bulkRequest, options);
BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options);

assertEquals(response, failureResponse);
verify(client).bulk(bulkRequest, options);
Expand All @@ -217,7 +217,7 @@ public void withoutRetryWhenCallFail() throws Exception {
public void increaseRateLimitWhenCallSucceed() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(bulkRequest, options)).thenReturn(successResponse);
when(successResponse.hasFailures()).thenReturn(false);
Expand All @@ -226,15 +226,15 @@ public void increaseRateLimitWhenCallSucceed() throws Exception {

assertEquals(rateLimiter.getRate(), 2);

bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);
assertEquals(rateLimiter.getRate(), 3);

bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);
assertEquals(rateLimiter.getRate(), 4);

// Should not exceed max rate limit
rateLimiter.setRate(20);
bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);
assertEquals(rateLimiter.getRate(), 20);
});
}
Expand All @@ -243,7 +243,7 @@ public void increaseRateLimitWhenCallSucceed() throws Exception {
public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
Expand All @@ -255,7 +255,7 @@ public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception {

rateLimiter.setRate(10);

bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);

// Should decrease once then increase once
assertEquals(rateLimiter.getRate(), 6);
Expand All @@ -266,7 +266,7 @@ public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception {
public void decreaseRateLimitWhenAllCallFail() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsWithRateLimit);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
Expand All @@ -278,7 +278,7 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception {

rateLimiter.setRate(20);

bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);

// Should decrease three times
assertEquals(rateLimiter.getRate(), 2.5);
Expand All @@ -297,7 +297,7 @@ public void testRateLimitFailureThreshold() throws Exception {

MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
Expand All @@ -309,7 +309,7 @@ public void testRateLimitFailureThreshold() throws Exception {

rateLimiter.setRate(19);

bulkRetryWrapper.bulk(client, bulkRequest, options);
bulkWrapper.bulk(client, bulkRequest, options);

// First request has 50% failure, not exceeding threshold, increasing rate
// Second and third requests has 100% failure, decreasing rate
Expand Down

0 comments on commit 560cfc4

Please sign in to comment.