Skip to content

Commit

Permalink
Fix application qps quota stalls.
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski committed Jan 21, 2025
1 parent 085c995 commit 2bc84d6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
* - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {

// Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled.
private static final double MIN_APP_QUOTA = Math.nextUp(0.0);
// standard value meaning - no app quota limit set
private static final double DISABLED_APP_QUOTA = -1;

private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
Expand Down Expand Up @@ -130,9 +136,10 @@ private void initializeApplicationQpsQuotas() {

String appName = entry.getKey();
double appQpsQuota =
entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication;
entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue()
: _defaultQpsQuotaForApplication;

if (appQpsQuota < 0) {
if (appQpsQuota < MIN_APP_QUOTA) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
Expand Down Expand Up @@ -348,16 +355,38 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databas
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA);
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override);
}

// Caller method need not worry about getting lock on _applicationRateLimiterMap
// as this method will do idempotent updates to the application rate limiters
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) {
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames, double override) {
ExternalView brokerResource = getBrokerResource();
Map<String, Double> quotas = null;
boolean quotasInitialized = false;

for (String appName : applicationNames) {
double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
if (qpsQuota < 0) {
double qpsQuota;
if (override >= MIN_APP_QUOTA) {
qpsQuota = override;
} else {
if (!quotasInitialized) {
quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
quotasInitialized = true;
}

if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) {
qpsQuota = quotas.get(appName);
} else {
qpsQuota = _defaultQpsQuotaForApplication;
}
}

if (qpsQuota < MIN_APP_QUOTA) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
Expand Down Expand Up @@ -436,22 +465,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
return _defaultQpsQuotaForDatabase;
}

/**
* Utility to get the effective query quota being imposed on an application. It is computed based on the default quota
* set at cluster config.
*
* @param applicationName application name to get the query quota on.
* @return effective query quota limit being applied
*/
private double getEffectiveQueryQuotaOnApplication(String applicationName) {
Map<String, Double> quotas =
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) {
return quotas.get(applicationName);
}
return _defaultQpsQuotaForApplication;
}

/**
* Creates a new database rate limiter. Will not update the database rate limiter if it already exists.
* @param databaseName database name for which rate limiter needs to be created
Expand All @@ -472,7 +485,7 @@ public void createApplicationRateLimiter(String applicationName) {
if (_applicationRateLimiterMap.containsKey(applicationName)) {
return;
}
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(applicationName);
}

/**
Expand Down Expand Up @@ -579,10 +592,12 @@ public boolean acquireApplication(String applicationName) {
}
QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName);
if (queryQuota == null) {
if (getDefaultQueryQuotaForApplication() < 0) {
// do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names
if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) {
return true;
} else {
createOrUpdateApplicationRateLimiter(applicationName);
// create limiter without querying ZK
createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication);
queryQuota = _applicationRateLimiterMap.get(applicationName);
}
}
Expand Down Expand Up @@ -809,8 +824,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
quota.setNumOnlineBrokers(onlineBrokerCount);
}
if (quota.getOverallRate() > 0) {
double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
if (quota.getOverallRate() >= MIN_APP_QUOTA) {
double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA);
quota.setRateLimiter(RateLimiter.create(qpsQuota));
}
}
Expand Down Expand Up @@ -843,7 +858,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() {
if (oldQpsQuota == _defaultQpsQuotaForApplication) {
return;
}
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()));
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA);
}

private double getDefaultQueryQuotaForDatabase() {
Expand All @@ -857,11 +872,16 @@ private double getDefaultQueryQuotaForDatabase() {

private double getDefaultQueryQuotaForApplication() {
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();
return Double.parseDouble(helixAdmin.getConfig(configScope,
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(_helixManager.getClusterName()).build();
String value = helixAdmin.getConfig(configScope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
if (value != null) {
return Double.parseDouble(value);
} else {
return DISABLED_APP_QUOTA;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource {
PinotHelixResourceManager _pinotHelixResourceManager;

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota configs. Will return empty map if application quotas are not defined at all.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -88,7 +88,7 @@ public Map<String, Double> getApplicationQuotas(@Context HttpHeaders httpHeaders
}

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota config. Will return null if application quotas is not defined.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -104,15 +104,16 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam("

HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_pinotHelixResourceManager.getHelixClusterName()).build();

HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
String defaultQuota =
helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
}

/**
* API to update the quota configs for application
* API to update the quota config for application.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void verifyQuotaUpdate(float quotaQps) {
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 5000, "Failed to reflect query quota on rate limiter in 5s.");
}, 10000, "Failed to reflect query quota on rate limiter in 5s.");
} catch (AssertionError ae) {
throw new AssertionError(
ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae);
Expand Down

0 comments on commit 2bc84d6

Please sign in to comment.