Skip to content

Commit

Permalink
remove failure threshold
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jan 9, 2025
1 parent c0865ce commit 07b5173
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public class FlintOptions implements Serializable {
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "1000";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "bulk.rate_limit_per_node.partial_failure_threshold";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "0";

public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) {
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();
partialFailureThreshold = flintOptions.getBulkRequestRateLimitPerNodePartialFailureThreshold();

if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
Expand All @@ -53,18 +52,20 @@ public void acquirePermit(int permits) {
}

/**
* Notify rate limiter of the failure rate of a bulk request. Additive-increase or multiplicative-decrease
* rate limit based on the failure rate. Does nothing if rate limit is not set.
* @param failureRate failure rate of the bulk request between 0 and 1
* Increase rate limit additively.
*/
public void reportFailure(double failureRate) {
public void increaseRate() {
if (rateLimiter != null) {
LOG.info("Bulk request failure rate " + failureRate + " reported");
if (failureRate > partialFailureThreshold) {
decreaseRate();
} else {
increaseRate();
}
setRate(getRate() + increaseStep);
}
}

/**
* Decrease rate limit multiplicatively.
*/
public void decreaseRate() {
if (rateLimiter != null) {
setRate(getRate() * decreaseRatio);
}
}

Expand All @@ -87,16 +88,4 @@ public void setRate(double permitsPerSecond) {
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) permitsPerSecond);
}
}

private void increaseRate() {
if (rateLimiter != null) {
setRate(getRate() + increaseStep);
}
}

private void decreaseRate() {
if (rateLimiter != null) {
setRate(getRate() * decreaseRatio);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkReques

if (!bulkItemRetryableResultPredicate.test(response)) {
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, 0);
rateLimiter.reportFailure(0);
rateLimiter.increaseRate();
} else {
rateLimiter.decreaseRate();

BulkRequest retryableRequest = getRetryableRequest(nextRequest.get(), response);
double retryablePercentage = (double) retryableRequest.requests().size() / response.getItems().length;
// TODO: long type metric
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, (long) (retryablePercentage * 100));

rateLimiter.reportFailure(retryablePercentage);

if (retryPolicy.getConfig().allowsRetries()) {
nextRequest.set(retryableRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ class OpenSearchBulkWrapperTest {
FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2",
FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.2"));
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5"));
FlintOptions optionsWithoutRateLimit = new FlintOptions(Map.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "false"));

Expand Down Expand Up @@ -285,38 +284,6 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception {
});
}

@Test
public void testRateLimitFailureThreshold() throws Exception {
FlintOptions optionsHighFailureThreshold = new FlintOptions(Map.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true",
FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2",
FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.8"));

MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold);
OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
.thenReturn(retriedResponse);
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);
mockFailureResponse();
mockRetriedResponse();

rateLimiter.setRate(19);

bulkWrapper.bulk(client, bulkRequest, options);

// First request has 50% failure, not exceeding threshold, increasing rate
// Second and third requests has 100% failure, decreasing rate
assertEquals(rateLimiter.getRate(), 5);
});
}

private void mockFailureResponse() {
when(failureResponse.hasFailures()).thenReturn(true);
when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,6 @@ object FlintSparkConf {
.doc("[Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1.")
.createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO)

val BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD =
FlintConfig(
s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD}")
.datasourceOption()
.doc("[Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1.")
.createWithDefault(
FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD)

val RETRYABLE_HTTP_STATUS_CODES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}")
.datasourceOption()
Expand Down Expand Up @@ -378,7 +370,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE,
BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP,
BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO,
BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
SERVICE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class FlintSparkConfSuite extends FlintSuite {
options.getBulkRequestMaxRateLimitPerNode shouldBe 50000
options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 1000
options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8
options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0
}

test("test specified bulk request rate limit options") {
Expand All @@ -81,8 +80,7 @@ class FlintSparkConfSuite extends FlintSuite {
"bulk.rate_limit_per_node.min" -> "20",
"bulk.rate_limit_per_node.max" -> "200",
"bulk.rate_limit_per_node.increase_step" -> "20",
"bulk.rate_limit_per_node.decrease_ratio" -> "0.5",
"bulk.rate_limit_per_node.partial_failure_threshold" -> "0.5").asJava)
"bulk.rate_limit_per_node.decrease_ratio" -> "0.5").asJava)
.flintOptions()
options.getBulkRequestRateLimitPerNodeEnabled shouldBe true
options.getBulkRequestMinRateLimitPerNode shouldBe 20
Expand Down

0 comments on commit 07b5173

Please sign in to comment.