Skip to content

Commit

Permalink
Merge pull request #23738 from vespa-engine/balder/implement-decay-ov…
Browse files Browse the repository at this point in the history
…er-time

- Refactor to allow for different decay method.
  • Loading branch information
baldersheim authored Aug 23, 2022
2 parents b409eb7 + 81575e0 commit 8bba638
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 62 deletions.
2 changes: 1 addition & 1 deletion config-model/src/main/resources/schema/content.rnc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ClusterControllerTuning = element cluster-controller {

DispatchTuning = element dispatch {
element max-hits-per-partition { xsd:nonNegativeInteger }? &
element dispatch-policy { string "round-robin" | string "adaptive" | string "random" | "best-of-random-2" | "latency-amortized-over-requests" }? &
element dispatch-policy { string "round-robin" | string "adaptive" | string "random" | "best-of-random-2" | "latency-amortized-over-requests" | "latency-amortized-over-time"}? &
element min-active-docs-coverage { xsd:double }? &
element top-k-probability { xsd:double }?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public abstract class CloseableInvoker implements Closeable {

protected abstract void release();

private BiConsumer<Boolean, Duration> teardown = null;
private BiConsumer<Boolean, RequestDuration> teardown = null;
private boolean success = false;
private long startTime = 0;
private RequestDuration duration;

public void teardown(BiConsumer<Boolean, Duration> teardown) {
public void teardown(BiConsumer<Boolean, RequestDuration> teardown) {
this.teardown = teardown;
this.startTime = System.nanoTime();
this.duration = new RequestDuration();
}

protected void setFinalStatus(boolean success) {
Expand All @@ -32,7 +32,7 @@ protected void setFinalStatus(boolean success) {
@Override
public final void close() {
if (teardown != null) {
teardown.accept(success, Duration.ofNanos(System.nanoTime() - startTime));
teardown.accept(success, duration.complete());
teardown = null;
}
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searc
invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time));
return invoker.get();
} else {
loadBalancer.releaseGroup(group, false, Duration.ZERO);
loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO));
if (rejected == null) {
rejected = new HashSet<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public class LoadBalancer {

private static final long DEFAULT_LATENCY_DECAY_RATE = 1000;
private static final long MIN_LATENCY_DECAY_RATE = 42;
private static final double INITIAL_QUERY_TIME = 0.001;
private static final double MIN_QUERY_TIME = 0.001;
private static final double LATENCY_DECAY_TIME = Duration.ofSeconds(5).toMillis()/1000.0;
private static final Duration INITIAL_QUERY_TIME = Duration.ofMillis(1);
private static final double MIN_QUERY_TIME = Duration.ofMillis(1).toMillis()/1000.0;

private final List<GroupStatus> scoreboard;
private final GroupScheduler scheduler;
Expand All @@ -45,8 +46,8 @@ public LoadBalancer(SearchCluster searchCluster, Policy policy) {
this.scheduler = switch (policy) {
case ROUNDROBIN: yield new RoundRobinScheduler(scoreboard);
case BEST_OF_RANDOM_2: yield new BestOfRandom2(new Random(), scoreboard);
case LATENCY_AMORTIZED_OVER_REQUESTS: yield new AdaptiveScheduler(new Random(), scoreboard);
case LATENCY_AMORTIZED_OVER_TIME: yield new AdaptiveScheduler(new Random(), scoreboard); // TODO Intentionally the same for now
case LATENCY_AMORTIZED_OVER_REQUESTS: yield new AdaptiveScheduler(AdaptiveScheduler.Type.REQUESTS, new Random(), scoreboard);
case LATENCY_AMORTIZED_OVER_TIME: yield new AdaptiveScheduler(AdaptiveScheduler.Type.TIME, new Random(), scoreboard);
};
}

Expand Down Expand Up @@ -80,7 +81,7 @@ public Optional<Group> takeGroup(Set<Integer> rejectedGroups) {
* @param success was the query successful
* @param searchTime query execution time, used for adaptive load balancing
*/
public void releaseGroup(Group group, boolean success, Duration searchTime) {
public void releaseGroup(Group group, boolean success, RequestDuration searchTime) {
synchronized (this) {
for (GroupStatus sched : scoreboard) {
if (sched.group.id() == group.id()) {
Expand All @@ -93,50 +94,51 @@ public void releaseGroup(Group group, boolean success, Duration searchTime) {

static class GroupStatus {

interface Decayer {
void decay(RequestDuration duration);
double averageCost();
}

static class NoDecay implements Decayer {
public void decay(RequestDuration duration) {}
public double averageCost() { return MIN_QUERY_TIME; }
}

private final Group group;
private int allocations = 0;
private long queries = 0;
private double averageSearchTime = INITIAL_QUERY_TIME;
private Decayer decayer;

GroupStatus(Group group) {
this.group = group;
this.decayer = new NoDecay();
}
void setDecayer(Decayer decayer) {
this.decayer = decayer;
}

void allocate() {
allocations++;
}

void release(boolean success, Duration searchTime) {
double searchSeconds = searchTime.toMillis()/1000.0;
void release(boolean success, RequestDuration searchTime) {
allocations--;
if (allocations < 0) {
log.warning("Double free of query target group detected");
allocations = 0;
}
if (success) {
searchSeconds = Math.max(searchSeconds, MIN_QUERY_TIME);
double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
averageSearchTime = (searchSeconds + (decayRate - 1) * averageSearchTime) / decayRate;
queries++;
decayer.decay(searchTime);
}
}

Duration averageSearchTime() {
return Duration.ofNanos((long)(averageSearchTime*1000000000));
}

double averageSearchTimeInverse() {
return 1.0 / averageSearchTime;
double weight() {
return 1.0 / decayer.averageCost();
}

int groupId() {
return group.id();
}

void setQueryStatistics(long queries, Duration averageSearchTime) {
this.queries = queries;
this.averageSearchTime = averageSearchTime.toMillis()/1000.0;
}
}

private interface GroupScheduler {
Expand Down Expand Up @@ -201,13 +203,62 @@ private int nextScoreboardIndex(int current) {
}

static class AdaptiveScheduler implements GroupScheduler {

enum Type {TIME, REQUESTS}
private final Random random;
private final List<GroupStatus> scoreboard;

public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) {
private static double toDouble(Duration duration) {
return duration.toNanos()/1_000_000_000.0;
}
private static Duration fromDouble(double seconds) { return Duration.ofNanos((long)(seconds*1_000_000_000));}

static class DecayByRequests implements GroupStatus.Decayer {
private long queries;
private double averageSearchTime;
DecayByRequests() {
this(0, INITIAL_QUERY_TIME);
}
DecayByRequests(long initialQueries, Duration initialSearchTime) {
queries = initialQueries;
averageSearchTime = toDouble(initialSearchTime);
}
public void decay(RequestDuration duration) {
double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME);
double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
queries++;
averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate;
}
public double averageCost() { return averageSearchTime; }
Duration averageSearchTime() { return fromDouble(averageSearchTime);}
}

static class DecayByTime implements GroupStatus.Decayer {
private double averageSearchTime;
private RequestDuration prev;
DecayByTime() {
this(INITIAL_QUERY_TIME, RequestDuration.of(Duration.ZERO));
}
DecayByTime(Duration initialSearchTime, RequestDuration start) {
averageSearchTime = toDouble(initialSearchTime);
prev = start;
}
public void decay(RequestDuration duration) {
double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME);
double decayRate = LATENCY_DECAY_TIME;
double sampleWeight = Math.min(decayRate/2, toDouble(duration.difference(prev)));
averageSearchTime = (sampleWeight*searchTime + (decayRate - sampleWeight) * averageSearchTime) / decayRate;
prev = duration;
}
public double averageCost() { return averageSearchTime; }
Duration averageSearchTime() { return fromDouble(averageSearchTime);}
}

public AdaptiveScheduler(Type type, Random random, List<GroupStatus> scoreboard) {
this.random = random;
this.scoreboard = scoreboard;
for (GroupStatus gs : scoreboard) {
gs.setDecayer(type == Type.REQUESTS ? new DecayByRequests() : new DecayByTime());
}
}

private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) {
Expand All @@ -216,7 +267,7 @@ private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage
for (GroupStatus gs : scoreboard) {
if (rejected == null || !rejected.contains(gs.group.id())) {
if (!requireCoverage || gs.group.hasSufficientCoverage()) {
sum += gs.averageSearchTimeInverse();
sum += gs.weight();
n++;
}
}
Expand All @@ -228,7 +279,7 @@ private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage
for (GroupStatus gs : scoreboard) {
if (rejected == null || !rejected.contains(gs.group.id())) {
if (!requireCoverage || gs.group.hasSufficientCoverage()) {
accum += gs.averageSearchTimeInverse();
accum += gs.weight();
if (needle < accum / sum) {
return Optional.of(gs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import java.time.Duration;
import java.time.Instant;

/**
* Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests.
* It does use System.nanoTime to get a steady clock.
*
* @author baldersheim
*/
class RequestDuration {
private final long startTime;
private long endTime;
RequestDuration() {
this(System.nanoTime());
}
private RequestDuration(long startTime) {
this.startTime = startTime;
}

RequestDuration complete() {
endTime = System.nanoTime();
return this;
}
private RequestDuration complete(long duration) {
endTime = startTime + duration;
return this;
}
Duration duration() {
return Duration.ofNanos(endTime - startTime);
}
Duration difference(RequestDuration prev) {
return Duration.ofNanos(Math.abs(endTime - prev.endTime));
}
static RequestDuration of(Duration duration) {
return new RequestDuration().complete(duration.toNanos());
}
static RequestDuration of(Instant sinceEpoch, Duration duration) {
return new RequestDuration(sinceEpoch.toEpochMilli()*1_000_000).complete(duration.toNanos());
}
}
Loading

0 comments on commit 8bba638

Please sign in to comment.