diff --git a/docs/index.md b/docs/index.md index 75a4ab72d..725acadda 100644 --- a/docs/index.md +++ b/docs/index.md @@ -536,11 +536,10 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. - `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false. -- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Minimum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 5000. -- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Maximum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than minimum rate. Default is 50000. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 500000. - `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 1000. - `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8. -- `spark.datasource.flint.write.bulk.rate_limit_per_node.partial_failure_threshold`: [Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0. - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 0a4688399..3a08047d8 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -43,7 +43,7 @@ public final class MetricConstants { /** * Metric name for opensearch bulk request rate limit */ - public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.count"; + public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.documentsPerSecond.count"; /** * Metric name for counting the errors encountered with Amazon S3 operations. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 454e33b05..6ffe1ce12 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -116,7 +116,7 @@ public class FlintOptions implements Serializable { public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.min"; public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "5000"; public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.max"; - public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "50000"; + public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "500000"; public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rate_limit_per_node.increase_step"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "1000"; public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio"; @@ -247,16 +247,16 @@ public boolean getBulkRequestRateLimitPerNodeEnabled() { return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED)); } - public double getBulkRequestMinRateLimitPerNode() { - return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE)); + public long getBulkRequestMinRateLimitPerNode() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE)); } - public double getBulkRequestMaxRateLimitPerNode() { - return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE)); + public long getBulkRequestMaxRateLimitPerNode() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE)); } - public double getBulkRequestRateLimitPerNodeIncreaseStep() { - return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP)); + public long getBulkRequestRateLimitPerNodeIncreaseStep() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP)); } public double getBulkRequestRateLimitPerNodeDecreaseRatio() { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java index 499b74fe8..1cef1baf1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java @@ -15,9 +15,9 @@ public class BulkRequestRateLimiter { private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName()); private RateLimiter rateLimiter; - private final double minRate; - private final double maxRate; - private final double increaseStep; + private final long minRate; + private final long maxRate; + private final long increaseStep; private final double decreaseRatio; public BulkRequestRateLimiter(FlintOptions flintOptions) { @@ -29,7 +29,7 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) { if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) { LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); this.rateLimiter = RateLimiter.create(minRate); - MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) minRate); + MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate); } else { LOG.info("Rate limit for bulk request was not set."); } @@ -64,27 +64,29 @@ public void increaseRate() { */ public void decreaseRate() { if (rateLimiter != null) { - setRate(getRate() * decreaseRatio); + setRate((long) (getRate() * decreaseRatio)); } } /** - * Rate getter and setter are public for test purpose only + * Rate getter and setter are public for testing purpose */ - public double getRate() { + public long getRate() { if (rateLimiter != null) { - return this.rateLimiter.getRate(); + return (long) this.rateLimiter.getRate(); } return 0; } - public void setRate(double permitsPerSecond) { + public void setRate(long permitsPerSecond) { if (rateLimiter != null) { - permitsPerSecond = Math.max(minRate, Math.min(maxRate, permitsPerSecond)); + if (maxRate > 0) { + permitsPerSecond = Math.min(permitsPerSecond, maxRate); + } + permitsPerSecond = Math.max(minRate, permitsPerSecond); LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec"); this.rateLimiter.setRate(permitsPerSecond); - // TODO: now it's using long metric - MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) permitsPerSecond); + MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond); } } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index a6b4a367b..faba2135f 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -146,13 +146,17 @@ object FlintSparkConf { val BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE}") .datasourceOption() - .doc("[Experimental] Minimum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than 0.") + .doc( + "[Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " + + "The adaptive rate will not drop below this value. Must be greater than 0.") .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE) val BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE}") .datasourceOption() - .doc("[Experimental] Maximum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than minimum rate.") + .doc( + "[Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " + + "The adaptive rate will not exceed this value. Set to -1 for no upper bound.") .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE) val BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 7ddf12be2..ea9e9257f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -68,7 +68,7 @@ class FlintSparkConfSuite extends FlintSuite { val options = FlintSparkConf().flintOptions() options.getBulkRequestRateLimitPerNodeEnabled shouldBe false options.getBulkRequestMinRateLimitPerNode shouldBe 5000 - options.getBulkRequestMaxRateLimitPerNode shouldBe 50000 + options.getBulkRequestMaxRateLimitPerNode shouldBe 500000 options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 1000 options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8 }