Skip to content

Commit

Permalink
Merge pull request #31896 from vespa-engine/revert-31867-revert-31866…
Browse files Browse the repository at this point in the history
…-mpolden/reduce-growth-discrepancy

Reapply "Make query rate growth measurement less sensitive to sudden bursts"
  • Loading branch information
bjormel authored Jul 8, 2024
2 parents 3492da8 + 283cbf4 commit 5e17f23
Show file tree
Hide file tree
Showing 15 changed files with 3,559 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public abstract class ConfigServerMaintainer extends Maintainer {

/** Creates a maintainer where maintainers on different nodes in this cluster run with even delay. */
ConfigServerMaintainer(ApplicationRepository applicationRepository, Curator curator, FlagSource flagSource,
Clock clock, Duration interval, boolean useLock) {
this(applicationRepository, curator, flagSource, clock, interval, useLock, false);
Clock clock, Duration interval, boolean acquireLock) {
this(applicationRepository, curator, flagSource, clock, interval, acquireLock, false);
}

/** Creates a maintainer where maintainers on different nodes in this cluster run with even delay. */
ConfigServerMaintainer(ApplicationRepository applicationRepository, Curator curator, FlagSource flagSource,
Clock clock, Duration interval, boolean useLock, boolean ignoreCollision) {
super(null, interval, clock, new JobControl(new JobControlFlags(curator, flagSource, useLock)),
new ConfigServerJobMetrics(applicationRepository.metric()), cluster(curator), ignoreCollision);
Clock clock, Duration interval, boolean acquireLock, boolean ignoreCollision) {
super(null, interval, clock, new JobControl(new JobControlFlags(curator, flagSource)),
new ConfigServerJobMetrics(applicationRepository.metric()), cluster(curator), ignoreCollision, 1.0, acquireLock);
this.applicationRepository = applicationRepository;
}

Expand Down Expand Up @@ -69,12 +69,10 @@ private static class JobControlFlags implements JobControlState {

private final Curator curator;
private final ListFlag<String> inactiveJobsFlag;
private final boolean useLock;

public JobControlFlags(Curator curator, FlagSource flagSource, boolean useLock) {
public JobControlFlags(Curator curator, FlagSource flagSource) {
this.curator = curator;
this.inactiveJobsFlag = PermanentFlags.INACTIVE_MAINTENANCE_JOBS.bindTo(flagSource);
this.useLock = useLock;
}

@Override
Expand All @@ -84,9 +82,7 @@ public Set<String> readInactiveJobs() {

@Override
public Mutex lockMaintenanceJob(String job) {
return (useLock)
? curator.lock(lockRoot.append(job), Duration.ofSeconds(1))
: () -> { };
return curator.lock(lockRoot.append(job), Duration.ofSeconds(1));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.yahoo.config.provision.NodeResources;
import com.yahoo.vespa.hosted.provision.NodeRepository;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -119,7 +120,8 @@ private NodeResources nodeResourcesWith(int nodes,
Limits limits,
Load loadAdjustment,
ClusterModel model) {
var loadWithTarget = model.loadAdjustmentWith(nodes, groups, loadAdjustment);
Instant now = nodeRepository.clock().instant();
var loadWithTarget = model.loadAdjustmentWith(nodes, groups, loadAdjustment, now);

// Leave some headroom above the ideal allocation to avoid immediately needing to scale back up
if (loadAdjustment.cpu() < 1 && (1.0 - loadWithTarget.cpu()) < headroomRequiredToScaleDown)
Expand All @@ -129,7 +131,7 @@ private NodeResources nodeResourcesWith(int nodes,
if (loadAdjustment.disk() < 1 && (1.0 - loadWithTarget.disk()) < headroomRequiredToScaleDown)
loadAdjustment = loadAdjustment.withDisk(Math.min(1.0, loadAdjustment.disk() * (1.0 + headroomRequiredToScaleDown)));

loadWithTarget = model.loadAdjustmentWith(nodes, groups, loadAdjustment);
loadWithTarget = model.loadAdjustmentWith(nodes, groups, loadAdjustment, now);

var scaled = loadWithTarget.scaled(model.current().realResources().nodeResources());
var nonScaled = limits.isEmpty() || limits.min().nodeResources().isUnspecified()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Autoscaling autoscale(Application application, Cluster cluster, NodeList
var loadAdjustment = model.loadAdjustment();
if (logDetails) {
log.info("Application: " + application.id().toShortString() + ", loadAdjustment: " +
loadAdjustment.toString() + ", ideal " + model.idealLoad() + ", " + model.cpu());
loadAdjustment.toString() + ", ideal " + model.idealLoad() + ", " + model.cpu(nodeRepository.clock().instant()));
}

var target = allocationOptimizer.findBestAllocation(loadAdjustment, model, limits, logDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.OptionalDouble;

/**
Expand Down Expand Up @@ -54,7 +55,6 @@ public class ClusterModel {
private final Cluster cluster;
private final AllocatableResources current;

private final CpuModel cpu = new CpuModel();
private final MemoryModel memory = new MemoryModel();
private final DiskModel disk = new DiskModel();

Expand Down Expand Up @@ -180,8 +180,8 @@ public Load peakLoad() {
}

/** Returns the relative load adjustment accounting for redundancy in this. */
public Load redundancyAdjustment() {
return loadWith(nodeCount(), groupCount());
private Load redundancyAdjustment(Instant now) {
return loadWith(nodeCount(), groupCount(), now);
}

public boolean isExclusive() {
Expand Down Expand Up @@ -214,17 +214,17 @@ public static Duration minScalingDuration() {
}

/** Transforms the given load adjustment to an equivalent adjustment given a target number of nodes and groups. */
public Load loadAdjustmentWith(int nodes, int groups, Load loadAdjustment) {
public Load loadAdjustmentWith(int nodes, int groups, Load loadAdjustment, Instant now) {
return loadAdjustment // redundancy adjusted target relative to current load
.multiply(loadWith(nodes, groups)) // redundancy aware adjustment with these counts
.divide(redundancyAdjustment()); // correct for double redundancy adjustment
.multiply(loadWith(nodes, groups, now)) // redundancy aware adjustment with these counts
.divide(redundancyAdjustment(now)); // correct for double redundancy adjustment
}

/**
* Returns the relative load adjustment accounting for redundancy given these nodes+groups
* relative to node nodes+groups in this.
*/
public Load loadWith(int givenNodes, int givenGroups) {
Load loadWith(int givenNodes, int givenGroups, Instant now) {
int nodes = nodesAdjustedForRedundancy(givenNodes, givenGroups);
int groups = groupsAdjustedForRedundancy(givenNodes, givenGroups);
if (clusterSpec().type() == ClusterSpec.Type.content) { // load scales with node share of content
Expand All @@ -238,6 +238,7 @@ public Load loadWith(int givenNodes, int givenGroups) {

double queryCpu = queryCpuPerGroup * groupCount() / groups;
double writeCpu = (double)groupSize() / groupSize;
CpuModel cpu = cpu(now);
return new Load(cpu.queryFraction() * queryCpu + (1 - cpu.queryFraction()) * writeCpu,
(1 - memory.fixedFraction()) * (double) groupSize() / groupSize + memory.fixedFraction() * 1,
(double)groupSize() / groupSize,
Expand All @@ -254,39 +255,41 @@ public Load loadWith(int givenNodes, int givenGroups) {
* if one of the nodes go down.
*/
public Load idealLoad() {
var ideal = new Load(cpu.idealLoad(), memory.idealLoad(), disk.idealLoad(), cpu.idealLoad(), memory.idealLoad()).divide(redundancyAdjustment());
CpuModel cpu = cpu(clock.instant());
var ideal = new Load(cpu.idealLoad(), memory.idealLoad(), disk.idealLoad(), cpu.idealLoad(), memory.idealLoad()).divide(redundancyAdjustment(cpu.at()));
if ( !cluster.bcpGroupInfo().isEmpty() && cluster.bcpGroupInfo().queryRate() > 0) {
// Since we have little local information, use information about query cost in other groups
Load bcpGroupIdeal = adjustQueryDependentIdealLoadByBcpGroupInfo(ideal);
Load bcpGroupIdeal = adjustQueryDependentIdealLoadByBcpGroupInfo(ideal, cpu);

// Do a weighted sum of the ideal "vote" based on local and bcp group info.
// This avoids any discontinuities with a near-zero local query rate.
double localInformationWeight = Math.min(1, averageQueryRate().orElse(0) /
double localInformationWeight = Math.min(1, averageQueryRate(cpu.at()).orElse(0) /
Math.min(queryRateGivingFullConfidence, cluster.bcpGroupInfo().queryRate()));
ideal = ideal.multiply(localInformationWeight).add(bcpGroupIdeal.multiply(1 - localInformationWeight));
}
return ideal;
}

public CpuModel cpu() {
return cpu;
public CpuModel cpu(Instant now) {
return new CpuModel(now);
}

private boolean canRescaleWithinBcpDeadline() {
return scalingDuration().minus(cluster.clusterInfo().bcpDeadline()).isNegative();
}

public Autoscaling.Metrics metrics() {
return new Autoscaling.Metrics(averageQueryRate().orElse(0),
growthRateHeadroom(),
cpu.costPerQuery().orElse(0));
Instant now = clock.instant();
return new Autoscaling.Metrics(averageQueryRate(now).orElse(0),
growthRateHeadroom(now),
cpu(now).costPerQuery().orElse(0));
}

private Load adjustQueryDependentIdealLoadByBcpGroupInfo(Load ideal) {
private Load adjustQueryDependentIdealLoadByBcpGroupInfo(Load ideal, CpuModel cpu) {
double currentClusterTotalVcpuPerGroup = nodes.not().retired().first().get().resources().vcpu() * groupSize();
double targetQueryRateToHandle = ( canRescaleWithinBcpDeadline() ? averageQueryRate().orElse(0)
double targetQueryRateToHandle = ( canRescaleWithinBcpDeadline() ? averageQueryRate(cpu.at()).orElse(0)
: cluster.bcpGroupInfo().queryRate() )
* cluster.bcpGroupInfo().growthRateHeadroom() * trafficShiftHeadroom();
* cluster.bcpGroupInfo().growthRateHeadroom() * trafficShiftHeadroom(cpu.at());
double neededTotalVcpuPerGroup = cluster.bcpGroupInfo().cpuCostPerQuery() * targetQueryRateToHandle / groupCount() +
( 1 - cpu.queryFraction()) * cpu.idealLoad() *
(clusterSpec.type().isContainer() ? 1 : groupSize());
Expand All @@ -306,21 +309,21 @@ private boolean hasScaledIn(Duration period) {
* Returns the predicted max query growth rate per minute as a fraction of the average traffic
* in the scaling window.
*/
private double maxQueryGrowthRate() {
private double maxQueryGrowthRate(Instant now) {
if (maxQueryGrowthRate != null) return maxQueryGrowthRate;
return maxQueryGrowthRate = clusterTimeseries().maxQueryGrowthRate(scalingDuration(), clock);
return maxQueryGrowthRate = clusterTimeseries().maxQueryGrowthRate(scalingDuration(), now);
}

/** Returns the average query rate in the scaling window as a fraction of the max observed query rate. */
private double queryFractionOfMax() {
private double queryFractionOfMax(Instant now) {
if (queryFractionOfMax != null) return queryFractionOfMax;
return queryFractionOfMax = clusterTimeseries().queryFractionOfMax(scalingDuration(), clock);
return queryFractionOfMax = clusterTimeseries().queryFractionOfMax(scalingDuration(), now);
}

/** Returns the average query rate in the scaling window. */
private OptionalDouble averageQueryRate() {
private OptionalDouble averageQueryRate(Instant now) {
if (averageQueryRate.isPresent()) return averageQueryRate;
return averageQueryRate = clusterTimeseries().queryRate(scalingDuration(), clock);
return averageQueryRate = clusterTimeseries().queryRate(scalingDuration(), now);
}

/** The number of nodes this cluster has, or will have if not deployed yet. */
Expand Down Expand Up @@ -352,21 +355,21 @@ private static int groupsAdjustedForRedundancy(int nodes, int groups) {
}

/** Returns the headroom for growth during organic traffic growth as a multiple of current resources. */
private double growthRateHeadroom() {
private double growthRateHeadroom(Instant now) {
if ( ! nodeRepository.zone().environment().isProduction()) return 1;
double growthRateHeadroom = 1 + maxQueryGrowthRate() * scalingDuration().toMinutes();
double growthRateHeadroom = 1 + maxQueryGrowthRate(now) * scalingDuration().toMinutes();
// Cap headroom at 10% above the historical observed peak
if (queryFractionOfMax() != 0)
growthRateHeadroom = Math.min(growthRateHeadroom, 1 / queryFractionOfMax() + 0.1);
if (queryFractionOfMax(now) != 0)
growthRateHeadroom = Math.min(growthRateHeadroom, 1 / queryFractionOfMax(now) + 0.1);

return adjustByConfidence(growthRateHeadroom);
return adjustByConfidence(growthRateHeadroom, now);
}

/**
* Returns the headroom is needed to handle sudden arrival of additional traffic due to another zone going down
* as a multiple of current resources.
*/
private double trafficShiftHeadroom() {
private double trafficShiftHeadroom(Instant now) {
if ( ! nodeRepository.zone().environment().isProduction()) return 1;
if (canRescaleWithinBcpDeadline()) return 1;
double trafficShiftHeadroom;
Expand All @@ -376,43 +379,53 @@ else if (application.status().currentReadShare() == 0)
trafficShiftHeadroom = 1/application.status().maxReadShare();
else
trafficShiftHeadroom = application.status().maxReadShare() / application.status().currentReadShare();
return adjustByConfidence(Math.min(trafficShiftHeadroom, 1/application.status().maxReadShare()));
return adjustByConfidence(Math.min(trafficShiftHeadroom, 1/application.status().maxReadShare()), now);
}

/**
* Headroom values are a multiplier of the current query rate.
* Adjust this value closer to 1 if the query rate is too low to derive statistical conclusions
* with high confidence to avoid large adjustments caused by random noise due to low traffic numbers.
*/
private double adjustByConfidence(double headroom) {
return ( (headroom -1 ) * Math.min(1, averageQueryRate().orElse(0) / queryRateGivingFullConfidence) ) + 1;
private double adjustByConfidence(double headroom, Instant now) {
return ( (headroom -1 ) * Math.min(1, averageQueryRate(now).orElse(0) / queryRateGivingFullConfidence) ) + 1;
}

public class CpuModel {

private final Instant at;

public CpuModel(Instant at) {
this.at = Objects.requireNonNull(at);
}

Instant at() {
return at;
}

/** Ideal cpu load must take the application traffic fraction into account. */
double idealLoad() {
double queryCpuFraction = queryFraction();
// Assumptions: 1) Write load is not organic so we should not increase to handle potential future growth.
// (TODO: But allow applications to set their target write rate and size for that)
// 2) Write load does not change in BCP scenarios.
return queryCpuFraction * 1/growthRateHeadroom() * 1/trafficShiftHeadroom() * idealQueryCpuLoad +
return queryCpuFraction * 1/growthRateHeadroom(at) * 1 / trafficShiftHeadroom(at) * idealQueryCpuLoad +
(1 - queryCpuFraction) * idealWriteCpuLoad;
}

OptionalDouble costPerQuery() {
if (averageQueryRate().isEmpty() || averageQueryRate().getAsDouble() == 0.0) return OptionalDouble.empty();
if (averageQueryRate(at()).isEmpty() || averageQueryRate(at()).getAsDouble() == 0.0) return OptionalDouble.empty();
// TODO: Query rate should generally be sampled at the time where we see the peak resource usage
int fanOut = clusterSpec.type().isContainer() ? 1 : groupSize();
return OptionalDouble.of(peakLoad().cpu() * cpu.queryFraction() * fanOut * nodes.not().retired().first().get().resources().vcpu()
/ averageQueryRate().getAsDouble() / groupCount());
return OptionalDouble.of(peakLoad().cpu() * queryFraction() * fanOut * nodes.not().retired().first().get().resources().vcpu()
/ averageQueryRate(at).getAsDouble() / groupCount());
}

/** The estimated fraction of cpu usage which goes to processing queries vs. writes */
double queryFraction() {
OptionalDouble writeRate = clusterTimeseries().writeRate(scalingDuration(), clock);
if (averageQueryRate().orElse(0) == 0 && writeRate.orElse(0) == 0) return queryFraction(0.5);
return queryFraction(averageQueryRate().orElse(0) / (averageQueryRate().orElse(0) + writeRate.orElse(0)));
OptionalDouble writeRate = clusterTimeseries().writeRate(scalingDuration(), at);
if (averageQueryRate(at).orElse(0) == 0 && writeRate.orElse(0) == 0) return queryFraction(0.5);
return queryFraction(averageQueryRate(at).orElse(0) / (averageQueryRate(at).orElse(0) + writeRate.orElse(0)));
}

double queryFraction(double queryRateFraction) {
Expand All @@ -423,7 +436,7 @@ OptionalDouble costPerQuery() {

public String toString() {
return "cpu model idealLoad: " + idealLoad() + ", queryFraction: " + queryFraction() +
", growthRateHeadroom: " + growthRateHeadroom() + ", trafficShiftHeadroom: " + trafficShiftHeadroom();
", growthRateHeadroom: " + growthRateHeadroom(at) + ", trafficShiftHeadroom: " + trafficShiftHeadroom(at);
}

}
Expand Down
Loading

0 comments on commit 5e17f23

Please sign in to comment.