diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 9e1480e4f2..03059fd30f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -97,6 +97,10 @@ private void run() { if (config.warmupDurationMinutes > 0) { LOGGER.info("Warming up for {} minutes...", config.warmupDurationMinutes); + long warmupStart = System.nanoTime(); + long warmupEnd = warmupStart + TimeUnit.MINUTES.toNanos(config.warmupDurationMinutes); + producerService.adjustRate(warmupStart, ProducerService.MIN_RATE); + producerService.adjustRate(warmupEnd, config.sendRate); collectStats(Duration.ofMinutes(config.warmupDurationMinutes)); } @@ -113,6 +117,7 @@ private void run() { consumerService.resume(); stats.reset(); + producerService.adjustRate(config.catchupRate); result = collectStats(backlogEnd); } else { LOGGER.info("Running test for {} minutes...", config.testDurationMinutes); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index 6f82acdcec..3eeb715d5a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -46,6 +46,7 @@ public class PerfConfig { public final double randomRatio; public final int randomPoolSize; public final int sendRate; + public final int catchupRate; public final int backlogDurationSeconds; public final int groupStartDelaySeconds; public final int warmupDurationMinutes; @@ -81,6 +82,7 @@ public PerfConfig(String[] args) { randomRatio = ns.getDouble("randomRatio"); randomPoolSize = ns.getInt("randomPoolSize"); sendRate = ns.getInt("sendRate"); + catchupRate = ns.getInt("catchupRate") == null ? sendRate : ns.getInt("catchupRate"); backlogDurationSeconds = ns.getInt("backlogDurationSeconds"); groupStartDelaySeconds = ns.getInt("groupStartDelaySeconds"); warmupDurationMinutes = ns.getInt("warmupDurationMinutes"); @@ -187,6 +189,11 @@ public static ArgumentParser parser() { .dest("sendRate") .metavar("SEND_RATE") .help("The send rate in messages per second."); + parser.addArgument("-a", "--catchup-rate") + .type(positiveInteger()) + .dest("catchupRate") + .metavar("CATCHUP_RATE") + .help("The catchup rate in messages per second. If not set, the send rate will be used."); parser.addArgument("-b", "--backlog-duration") .setDefault(0) .type(nonNegativeInteger()) diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java index a2d35a6e1d..f8b9beb576 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java @@ -17,11 +17,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Properties; import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -45,12 +48,23 @@ public class ProducerService implements AutoCloseable { public static final Charset HEADER_KEY_CHARSET = StandardCharsets.UTF_8; public static final String HEADER_KEY_SEND_TIME_NANOS = "send_time_nanos"; + public static final double MIN_RATE = 1.0; + private static final int ADJUST_RATE_INTERVAL_SECONDS = 5; private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class); private final List producers = new LinkedList<>(); + private final ScheduledExecutorService adjustRateExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("perf-producer-rate-adjust", true)); private final ExecutorService executor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("perf-producer", false)); - private UniformRateLimiter rateLimiter = new UniformRateLimiter(1.0); + /** + * A map of rate changes over time. The rate at the given time will be calculated by linear interpolation between + * the two nearest known rates. + * If there is no known rate at the given time, the {@link #defaultRate} will be used. + */ + private final NavigableMap rateMap = new TreeMap<>(); + private double defaultRate = MIN_RATE; + private UniformRateLimiter rateLimiter = new UniformRateLimiter(defaultRate); + private volatile boolean closed = false; /** @@ -106,6 +120,7 @@ public int probe() { */ public void start(List payloads, double rate) { adjustRate(rate); + adjustRateExecutor.scheduleWithFixedDelay(this::adjustRate, 0, ADJUST_RATE_INTERVAL_SECONDS, TimeUnit.SECONDS); int processors = Runtime.getRuntime().availableProcessors(); // shard producers across processors int batchSize = Math.max(1, producers.size() / processors); @@ -116,10 +131,49 @@ public void start(List payloads, double rate) { } } - public void adjustRate(double rate) { + /** + * Adjust the rate at the given time. + */ + public void adjustRate(long timeNanos, double rate) { + rateMap.put(timeNanos, rate); + adjustRate(); + } + + /** + * Adjust the default rate. + */ + public void adjustRate(double defaultRate) { + this.defaultRate = defaultRate; + adjustRate(); + } + + private void adjustRate() { + double rate = calculateRate(System.nanoTime()); this.rateLimiter = new UniformRateLimiter(rate); } + private double calculateRate(long now) { + Map.Entry floorEntry = rateMap.floorEntry(now); + Map.Entry ceilingEntry = rateMap.ceilingEntry(now); + if (null == floorEntry || null == ceilingEntry) { + return defaultRate; + } + + long floorTime = floorEntry.getKey(); + double floorRate = floorEntry.getValue(); + long ceilingTime = ceilingEntry.getKey(); + double ceilingRate = ceilingEntry.getValue(); + + return calculateY(floorTime, floorRate, ceilingTime, ceilingRate, now); + } + + private double calculateY(long x1, double y1, long x2, double y2, long x) { + if (x1 == x2) { + return y1; + } + return y1 + (x - x1) * (y2 - y1) / (x2 - x1); + } + private void start(List producers, List payloads) { executor.submit(() -> { try { @@ -151,6 +205,7 @@ private void sendMessage(Producer producer, byte[] payload) { @Override public void close() { closed = true; + adjustRateExecutor.shutdownNow(); executor.shutdown(); try { if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {