diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 48c5c33d0a7a..b0d73f176549 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -74,6 +74,12 @@ * - broker added or removed from cluster */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { + + // Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled. + private static final double MIN_APP_QUOTA = Math.nextUp(0.0); + // standard value meaning - no app quota limit set + private static final double DISABLED_APP_QUOTA = -1; + private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1; private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60; @@ -130,9 +136,10 @@ private void initializeApplicationQpsQuotas() { String appName = entry.getKey(); double appQpsQuota = - entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication; + entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue() + : _defaultQpsQuotaForApplication; - if (appQpsQuota < 0) { + if (appQpsQuota < MIN_APP_QUOTA) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } @@ -348,16 +355,38 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List databas } public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) { - createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA); + } + + public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) { + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override); } // Caller method need not worry about getting lock on _applicationRateLimiterMap // as this method will do idempotent updates to the application rate limiters - private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames) { + private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames, double override) { ExternalView brokerResource = getBrokerResource(); + Map quotas = null; + boolean quotasInitialized = false; + for (String appName : applicationNames) { - double qpsQuota = getEffectiveQueryQuotaOnApplication(appName); - if (qpsQuota < 0) { + double qpsQuota; + if (override >= MIN_APP_QUOTA) { + qpsQuota = override; + } else { + if (!quotasInitialized) { + quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); + quotasInitialized = true; + } + + if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) { + qpsQuota = quotas.get(appName); + } else { + qpsQuota = _defaultQpsQuotaForApplication; + } + } + + if (qpsQuota < MIN_APP_QUOTA) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } @@ -436,22 +465,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) { return _defaultQpsQuotaForDatabase; } - /** - * Utility to get the effective query quota being imposed on an application. It is computed based on the default quota - * set at cluster config. - * - * @param applicationName application name to get the query quota on. - * @return effective query quota limit being applied - */ - private double getEffectiveQueryQuotaOnApplication(String applicationName) { - Map quotas = - ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); - if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) { - return quotas.get(applicationName); - } - return _defaultQpsQuotaForApplication; - } - /** * Creates a new database rate limiter. Will not update the database rate limiter if it already exists. * @param databaseName database name for which rate limiter needs to be created @@ -472,7 +485,7 @@ public void createApplicationRateLimiter(String applicationName) { if (_applicationRateLimiterMap.containsKey(applicationName)) { return; } - createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + createOrUpdateApplicationRateLimiter(applicationName); } /** @@ -579,10 +592,12 @@ public boolean acquireApplication(String applicationName) { } QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName); if (queryQuota == null) { - if (getDefaultQueryQuotaForApplication() < 0) { + // do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names + if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) { return true; } else { - createOrUpdateApplicationRateLimiter(applicationName); + // create limiter without querying ZK + createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication); queryQuota = _applicationRateLimiterMap.get(applicationName); } } @@ -809,8 +824,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke if (quota.getNumOnlineBrokers() != onlineBrokerCount) { quota.setNumOnlineBrokers(onlineBrokerCount); } - if (quota.getOverallRate() > 0) { - double qpsQuota = quota.getOverallRate() / onlineBrokerCount; + if (quota.getOverallRate() >= MIN_APP_QUOTA) { + double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA); quota.setRateLimiter(RateLimiter.create(qpsQuota)); } } @@ -843,7 +858,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() { if (oldQpsQuota == _defaultQpsQuotaForApplication) { return; } - createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet())); + createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA); } private double getDefaultQueryQuotaForDatabase() { @@ -857,11 +872,16 @@ private double getDefaultQueryQuotaForDatabase() { private double getDefaultQueryQuotaForApplication() { HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); - HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - _helixManager.getClusterName()).build(); - return Double.parseDouble(helixAdmin.getConfig(configScope, + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_helixManager.getClusterName()).build(); + String value = helixAdmin.getConfig(configScope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) - .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1")); + .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND); + if (value != null) { + return Double.parseDouble(value); + } else { + return DISABLED_APP_QUOTA; + } } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java index db050168faf9..dec69d80ffa0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java @@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource { PinotHelixResourceManager _pinotHelixResourceManager; /** - * API to get application quota configs. Will return null if application quotas are not defined + * API to get application quota configs. Will return empty map if application quotas are not defined at all. */ @GET @Produces(MediaType.APPLICATION_JSON) @@ -88,7 +88,7 @@ public Map getApplicationQuotas(@Context HttpHeaders httpHeaders } /** - * API to get application quota configs. Will return null if application quotas are not defined + * API to get application quota config. Will return null if application quotas is not defined. */ @GET @Produces(MediaType.APPLICATION_JSON) @@ -104,15 +104,16 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam(" HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( _pinotHelixResourceManager.getHelixClusterName()).build(); + HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); String defaultQuota = helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) - .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null); + .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND); return defaultQuota != null ? Double.parseDouble(defaultQuota) : null; } /** - * API to update the quota configs for application + * API to update the quota config for application. */ @POST @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index a40cbdf2909c..5e807d55562f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -319,7 +319,7 @@ private void verifyQuotaUpdate(float quotaQps) { } catch (IOException e) { throw new RuntimeException(e); } - }, 5000, "Failed to reflect query quota on rate limiter in 5s."); + }, 10000, "Failed to reflect query quota on rate limiter in 5s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae);