Skip to content

Commit

Permalink
feat(tools/perf): adjust send rate during warmup and catchup (#1394)
Browse files Browse the repository at this point in the history
* feat(tools/perf): adjust send rate during warmup

Signed-off-by: Ning Yu <[email protected]>

* feat(tools/perf): add option "catchupRate"

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Jun 7, 2024
1 parent a0e7e4a commit 8da24d1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private void run() {

if (config.warmupDurationMinutes > 0) {
LOGGER.info("Warming up for {} minutes...", config.warmupDurationMinutes);
long warmupStart = System.nanoTime();
long warmupEnd = warmupStart + TimeUnit.MINUTES.toNanos(config.warmupDurationMinutes);
producerService.adjustRate(warmupStart, ProducerService.MIN_RATE);
producerService.adjustRate(warmupEnd, config.sendRate);
collectStats(Duration.ofMinutes(config.warmupDurationMinutes));
}

Expand All @@ -113,6 +117,7 @@ private void run() {
consumerService.resume();

stats.reset();
producerService.adjustRate(config.catchupRate);
result = collectStats(backlogEnd);
} else {
LOGGER.info("Running test for {} minutes...", config.testDurationMinutes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PerfConfig {
public final double randomRatio;
public final int randomPoolSize;
public final int sendRate;
public final int catchupRate;
public final int backlogDurationSeconds;
public final int groupStartDelaySeconds;
public final int warmupDurationMinutes;
Expand Down Expand Up @@ -81,6 +82,7 @@ public PerfConfig(String[] args) {
randomRatio = ns.getDouble("randomRatio");
randomPoolSize = ns.getInt("randomPoolSize");
sendRate = ns.getInt("sendRate");
catchupRate = ns.getInt("catchupRate") == null ? sendRate : ns.getInt("catchupRate");
backlogDurationSeconds = ns.getInt("backlogDurationSeconds");
groupStartDelaySeconds = ns.getInt("groupStartDelaySeconds");
warmupDurationMinutes = ns.getInt("warmupDurationMinutes");
Expand Down Expand Up @@ -187,6 +189,11 @@ public static ArgumentParser parser() {
.dest("sendRate")
.metavar("SEND_RATE")
.help("The send rate in messages per second.");
parser.addArgument("-a", "--catchup-rate")
.type(positiveInteger())
.dest("catchupRate")
.metavar("CATCHUP_RATE")
.help("The catchup rate in messages per second. If not set, the send rate will be used.");
parser.addArgument("-b", "--backlog-duration")
.setDefault(0)
.type(nonNegativeInteger())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -45,12 +48,23 @@ public class ProducerService implements AutoCloseable {

public static final Charset HEADER_KEY_CHARSET = StandardCharsets.UTF_8;
public static final String HEADER_KEY_SEND_TIME_NANOS = "send_time_nanos";
public static final double MIN_RATE = 1.0;
private static final int ADJUST_RATE_INTERVAL_SECONDS = 5;
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);

private final List<Producer> producers = new LinkedList<>();
private final ScheduledExecutorService adjustRateExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("perf-producer-rate-adjust", true));
private final ExecutorService executor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("perf-producer", false));

private UniformRateLimiter rateLimiter = new UniformRateLimiter(1.0);
/**
* A map of rate changes over time. The rate at the given time will be calculated by linear interpolation between
* the two nearest known rates.
* If there is no known rate at the given time, the {@link #defaultRate} will be used.
*/
private final NavigableMap<Long, Double> rateMap = new TreeMap<>();
private double defaultRate = MIN_RATE;
private UniformRateLimiter rateLimiter = new UniformRateLimiter(defaultRate);

private volatile boolean closed = false;

/**
Expand Down Expand Up @@ -106,6 +120,7 @@ public int probe() {
*/
public void start(List<byte[]> payloads, double rate) {
adjustRate(rate);
adjustRateExecutor.scheduleWithFixedDelay(this::adjustRate, 0, ADJUST_RATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
int processors = Runtime.getRuntime().availableProcessors();
// shard producers across processors
int batchSize = Math.max(1, producers.size() / processors);
Expand All @@ -116,10 +131,49 @@ public void start(List<byte[]> payloads, double rate) {
}
}

public void adjustRate(double rate) {
/**
* Adjust the rate at the given time.
*/
public void adjustRate(long timeNanos, double rate) {
rateMap.put(timeNanos, rate);
adjustRate();
}

/**
* Adjust the default rate.
*/
public void adjustRate(double defaultRate) {
this.defaultRate = defaultRate;
adjustRate();
}

private void adjustRate() {
double rate = calculateRate(System.nanoTime());
this.rateLimiter = new UniformRateLimiter(rate);
}

private double calculateRate(long now) {
Map.Entry<Long, Double> floorEntry = rateMap.floorEntry(now);
Map.Entry<Long, Double> ceilingEntry = rateMap.ceilingEntry(now);
if (null == floorEntry || null == ceilingEntry) {
return defaultRate;
}

long floorTime = floorEntry.getKey();
double floorRate = floorEntry.getValue();
long ceilingTime = ceilingEntry.getKey();
double ceilingRate = ceilingEntry.getValue();

return calculateY(floorTime, floorRate, ceilingTime, ceilingRate, now);
}

private double calculateY(long x1, double y1, long x2, double y2, long x) {
if (x1 == x2) {
return y1;
}
return y1 + (x - x1) * (y2 - y1) / (x2 - x1);
}

private void start(List<Producer> producers, List<byte[]> payloads) {
executor.submit(() -> {
try {
Expand Down Expand Up @@ -151,6 +205,7 @@ private void sendMessage(Producer producer, byte[] payload) {
@Override
public void close() {
closed = true;
adjustRateExecutor.shutdownNow();
executor.shutdown();
try {
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
Expand Down

0 comments on commit 8da24d1

Please sign in to comment.