Skip to content

Commit

Permalink
change rate from double to long
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jan 9, 2025
1 parent 40e2818 commit d55bb13
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 26 deletions.
5 changes: 2 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,10 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Minimum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 5000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Maximum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than minimum rate. Default is 50000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 500000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 1000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.partial_failure_threshold`: [Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class MetricConstants {
/**
* Metric name for opensearch bulk request rate limit
*/
public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.count";
public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.documentsPerSecond.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class FlintOptions implements Serializable {
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.min";
public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "5000";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.max";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "50000";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "500000";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rate_limit_per_node.increase_step";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "1000";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio";
Expand Down Expand Up @@ -247,16 +247,16 @@ public boolean getBulkRequestRateLimitPerNodeEnabled() {
return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED));
}

public double getBulkRequestMinRateLimitPerNode() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE));
public long getBulkRequestMinRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE));
}

public double getBulkRequestMaxRateLimitPerNode() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE));
public long getBulkRequestMaxRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE));
}

public double getBulkRequestRateLimitPerNodeIncreaseStep() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP));
public long getBulkRequestRateLimitPerNodeIncreaseStep() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP));
}

public double getBulkRequestRateLimitPerNodeDecreaseRatio() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public class BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName());
private RateLimiter rateLimiter;

private final double minRate;
private final double maxRate;
private final double increaseStep;
private final long minRate;
private final long maxRate;
private final long increaseStep;
private final double decreaseRatio;

public BulkRequestRateLimiter(FlintOptions flintOptions) {
Expand All @@ -29,7 +29,7 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) {
if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
this.rateLimiter = RateLimiter.create(minRate);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) minRate);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate);
} else {
LOG.info("Rate limit for bulk request was not set.");
}
Expand Down Expand Up @@ -64,27 +64,29 @@ public void increaseRate() {
*/
public void decreaseRate() {
if (rateLimiter != null) {
setRate(getRate() * decreaseRatio);
setRate((long) (getRate() * decreaseRatio));
}
}

/**
* Rate getter and setter are public for test purpose only
* Rate getter and setter are public for testing purpose
*/
public double getRate() {
public long getRate() {
if (rateLimiter != null) {
return this.rateLimiter.getRate();
return (long) this.rateLimiter.getRate();
}
return 0;
}

public void setRate(double permitsPerSecond) {
public void setRate(long permitsPerSecond) {
if (rateLimiter != null) {
permitsPerSecond = Math.max(minRate, Math.min(maxRate, permitsPerSecond));
if (maxRate > 0) {
permitsPerSecond = Math.min(permitsPerSecond, maxRate);
}
permitsPerSecond = Math.max(minRate, permitsPerSecond);
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec");
this.rateLimiter.setRate(permitsPerSecond);
// TODO: now it's using long metric
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) permitsPerSecond);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,17 @@ object FlintSparkConf {
val BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE =
FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE}")
.datasourceOption()
.doc("[Experimental] Minimum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than 0.")
.doc(
"[Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " +
"The adaptive rate will not drop below this value. Must be greater than 0.")
.createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE)

val BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE =
FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE}")
.datasourceOption()
.doc("[Experimental] Maximum rate limit (documents/sec) for bulk request per worker node, if rate limit enabled. Must be greater than minimum rate.")
.doc(
"[Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " +
"The adaptive rate will not exceed this value. Set to -1 for no upper bound.")
.createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE)

val BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class FlintSparkConfSuite extends FlintSuite {
val options = FlintSparkConf().flintOptions()
options.getBulkRequestRateLimitPerNodeEnabled shouldBe false
options.getBulkRequestMinRateLimitPerNode shouldBe 5000
options.getBulkRequestMaxRateLimitPerNode shouldBe 50000
options.getBulkRequestMaxRateLimitPerNode shouldBe 500000
options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 1000
options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8
}
Expand Down

0 comments on commit d55bb13

Please sign in to comment.