diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 7d6d152bc..275f6869a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -121,8 +121,6 @@ public class FlintOptions implements Serializable { public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "1000"; public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8"; - public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "bulk.rate_limit_per_node.partial_failure_threshold"; - public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "0"; public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes"; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java index 877850a26..f264dd2db 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java @@ -26,7 +26,6 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) { maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode(); increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep(); decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio(); - partialFailureThreshold = flintOptions.getBulkRequestRateLimitPerNodePartialFailureThreshold(); if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) { LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); @@ -53,18 +52,20 @@ public void acquirePermit(int permits) { } /** - * Notify rate limiter of the failure rate of a bulk request. Additive-increase or multiplicative-decrease - * rate limit based on the failure rate. Does nothing if rate limit is not set. - * @param failureRate failure rate of the bulk request between 0 and 1 + * Increase rate limit additively. */ - public void reportFailure(double failureRate) { + public void increaseRate() { if (rateLimiter != null) { - LOG.info("Bulk request failure rate " + failureRate + " reported"); - if (failureRate > partialFailureThreshold) { - decreaseRate(); - } else { - increaseRate(); - } + setRate(getRate() + increaseStep); + } + } + + /** + * Decrease rate limit multiplicatively. + */ + public void decreaseRate() { + if (rateLimiter != null) { + setRate(getRate() * decreaseRatio); } } @@ -87,16 +88,4 @@ public void setRate(double permitsPerSecond) { MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) permitsPerSecond); } } - - private void increaseRate() { - if (rateLimiter != null) { - setRate(getRate() + increaseStep); - } - } - - private void decreaseRate() { - if (rateLimiter != null) { - setRate(getRate() * decreaseRatio); - } - } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java index c6ecff085..422f084a1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java @@ -73,15 +73,15 @@ private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkReques if (!bulkItemRetryableResultPredicate.test(response)) { MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, 0); - rateLimiter.reportFailure(0); + rateLimiter.increaseRate(); } else { + rateLimiter.decreaseRate(); + BulkRequest retryableRequest = getRetryableRequest(nextRequest.get(), response); double retryablePercentage = (double) retryableRequest.requests().size() / response.getItems().length; // TODO: long type metric MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, (long) (retryablePercentage * 100)); - rateLimiter.reportFailure(retryablePercentage); - if (retryPolicy.getConfig().allowsRetries()) { nextRequest.set(retryableRequest); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java index a93607f93..549aac322 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java @@ -74,8 +74,7 @@ class OpenSearchBulkWrapperTest { FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2", FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20", FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.2")); + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5")); FlintOptions optionsWithoutRateLimit = new FlintOptions(Map.of( FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "false")); @@ -285,38 +284,6 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception { }); } - @Test - public void testRateLimitFailureThreshold() throws Exception { - FlintOptions optionsHighFailureThreshold = new FlintOptions(Map.of( - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true", - FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2", - FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.8")); - - MetricsTestUtil.withMetricEnv(verifier -> { - BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold); - OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( - retryOptionsWithRetry, rateLimiter); - when(client.bulk(any(), eq(options))) - .thenReturn(failureResponse) - .thenReturn(retriedResponse); - when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); - when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - mockFailureResponse(); - mockRetriedResponse(); - - rateLimiter.setRate(19); - - bulkWrapper.bulk(client, bulkRequest, options); - - // First request has 50% failure, not exceeding threshold, increasing rate - // Second and third requests has 100% failure, decreasing rate - assertEquals(rateLimiter.getRate(), 5); - }); - } - private void mockFailureResponse() { when(failureResponse.hasFailures()).thenReturn(true); when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem}); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 5406d0d54..a6b4a367b 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -169,14 +169,6 @@ object FlintSparkConf { .doc("[Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1.") .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO) - val BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = - FlintConfig( - s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD}") - .datasourceOption() - .doc("[Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1.") - .createWithDefault( - FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD) - val RETRYABLE_HTTP_STATUS_CODES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}") .datasourceOption() @@ -378,7 +370,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, - BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, SERVICE_NAME, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 29cbef6bd..9909cd351 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -71,7 +71,6 @@ class FlintSparkConfSuite extends FlintSuite { options.getBulkRequestMaxRateLimitPerNode shouldBe 50000 options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 1000 options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8 - options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0 } test("test specified bulk request rate limit options") { @@ -81,8 +80,7 @@ class FlintSparkConfSuite extends FlintSuite { "bulk.rate_limit_per_node.min" -> "20", "bulk.rate_limit_per_node.max" -> "200", "bulk.rate_limit_per_node.increase_step" -> "20", - "bulk.rate_limit_per_node.decrease_ratio" -> "0.5", - "bulk.rate_limit_per_node.partial_failure_threshold" -> "0.5").asJava) + "bulk.rate_limit_per_node.decrease_ratio" -> "0.5").asJava) .flintOptions() options.getBulkRequestRateLimitPerNodeEnabled shouldBe true options.getBulkRequestMinRateLimitPerNode shouldBe 20