Skip to content

Commit

Permalink
Fixes #247 Fix Coordinated Omission (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored Apr 7, 2022
1 parent 64cc01d commit 64eff25
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public class TestResult {
public List<Double> publishLatency9999pct = new ArrayList<>();
public List<Double> publishLatencyMax = new ArrayList<>();

public List<Double> publishDelayLatencyAvg = new ArrayList<>();
public List<Long> publishDelayLatency50pct = new ArrayList<>();
public List<Long> publishDelayLatency75pct = new ArrayList<>();
public List<Long> publishDelayLatency95pct = new ArrayList<>();
public List<Long> publishDelayLatency99pct = new ArrayList<>();
public List<Long> publishDelayLatency999pct = new ArrayList<>();
public List<Long> publishDelayLatency9999pct = new ArrayList<>();
public List<Long> publishDelayLatencyMax = new ArrayList<>();

public double aggregatedPublishLatencyAvg;
public double aggregatedPublishLatency50pct;
public double aggregatedPublishLatency75pct;
Expand All @@ -54,8 +63,19 @@ public class TestResult {
public double aggregatedPublishLatency9999pct;
public double aggregatedPublishLatencyMax;

public double aggregatedPublishDelayLatencyAvg;
public long aggregatedPublishDelayLatency50pct;
public long aggregatedPublishDelayLatency75pct;
public long aggregatedPublishDelayLatency95pct;
public long aggregatedPublishDelayLatency99pct;
public long aggregatedPublishDelayLatency999pct;
public long aggregatedPublishDelayLatency9999pct;
public long aggregatedPublishDelayLatencyMax;

public Map<Double, Double> aggregatedPublishLatencyQuantiles = new TreeMap<>();

public Map<Double, Long> aggregatedPublishDelayLatencyQuantiles = new TreeMap<>();

// End to end latencies (from producer to consumer)
// Latencies are expressed in milliseconds (without decimals)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,20 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
- stats.totalMessagesReceived;

log.info(
"Pub rate {} msg/s / {} MB/s | Cons rate {} msg/s / {} MB/s | Backlog: {} K | Pub Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}",
"Pub rate {} msg/s / {} MB/s | Cons rate {} msg/s / {} MB/s | Backlog: {} K | Pub Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {} | Pub Delay Latency (us) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}",
rateFormat.format(publishRate), throughputFormat.format(publishThroughput),
rateFormat.format(consumeRate), throughputFormat.format(consumeThroughput),
dec.format(currentBacklog / 1000.0), //
dec.format(microsToMillis(stats.publishLatency.getMean())),
dec.format(microsToMillis(stats.publishLatency.getValueAtPercentile(50))),
dec.format(microsToMillis(stats.publishLatency.getValueAtPercentile(99))),
dec.format(microsToMillis(stats.publishLatency.getValueAtPercentile(99.9))),
throughputFormat.format(microsToMillis(stats.publishLatency.getMaxValue())));
throughputFormat.format(microsToMillis(stats.publishLatency.getMaxValue())),
dec.format(stats.publishDelayLatency.getMean()),
dec.format(stats.publishDelayLatency.getValueAtPercentile(50)),
dec.format(stats.publishDelayLatency.getValueAtPercentile(99)),
dec.format(stats.publishDelayLatency.getValueAtPercentile(99.9)),
throughputFormat.format(stats.publishDelayLatency.getMaxValue()));

result.publishRate.add(publishRate);
result.consumeRate.add(consumeRate);
Expand All @@ -438,6 +443,16 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
result.publishLatency9999pct.add(microsToMillis(stats.publishLatency.getValueAtPercentile(99.99)));
result.publishLatencyMax.add(microsToMillis(stats.publishLatency.getMaxValue()));

result.publishDelayLatencyAvg.add(stats.publishDelayLatency.getMean());
result.publishDelayLatency50pct.add(stats.publishDelayLatency.getValueAtPercentile(50));
result.publishDelayLatency75pct.add(stats.publishDelayLatency.getValueAtPercentile(75));
result.publishDelayLatency95pct.add(stats.publishDelayLatency.getValueAtPercentile(95));
result.publishDelayLatency99pct.add(stats.publishDelayLatency.getValueAtPercentile(99));
result.publishDelayLatency999pct.add(stats.publishDelayLatency.getValueAtPercentile(99.9));
result.publishDelayLatency9999pct.add(stats.publishDelayLatency.getValueAtPercentile(99.99));
result.publishDelayLatencyMax.add(stats.publishDelayLatency.getMaxValue());


result.endToEndLatencyAvg.add(microsToMillis(stats.endToEndLatency.getMean()));
result.endToEndLatency50pct.add(microsToMillis(stats.endToEndLatency.getValueAtPercentile(50)));
result.endToEndLatency75pct.add(microsToMillis(stats.endToEndLatency.getValueAtPercentile(75)));
Expand All @@ -450,14 +465,21 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
if (now >= testEndTime && !needToWaitForBacklogDraining) {
CumulativeLatencies agg = worker.getCumulativeLatencies();
log.info(
"----- Aggregated Pub Latency (ms) avg: {} - 50%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}",
"----- Aggregated Pub Latency (ms) avg: {} - 50%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {} | Pub Delay (us) avg: {} - 50%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}",
dec.format(agg.publishLatency.getMean() / 1000.0),
dec.format(agg.publishLatency.getValueAtPercentile(50) / 1000.0),
dec.format(agg.publishLatency.getValueAtPercentile(95) / 1000.0),
dec.format(agg.publishLatency.getValueAtPercentile(99) / 1000.0),
dec.format(agg.publishLatency.getValueAtPercentile(99.9) / 1000.0),
dec.format(agg.publishLatency.getValueAtPercentile(99.99) / 1000.0),
throughputFormat.format(agg.publishLatency.getMaxValue() / 1000.0));
throughputFormat.format(agg.publishLatency.getMaxValue() / 1000.0),
dec.format(agg.publishDelayLatency.getMean()),
dec.format(agg.publishDelayLatency.getValueAtPercentile(50)),
dec.format(agg.publishDelayLatency.getValueAtPercentile(95)),
dec.format(agg.publishDelayLatency.getValueAtPercentile(99)),
dec.format(agg.publishDelayLatency.getValueAtPercentile(99.9)),
dec.format(agg.publishDelayLatency.getValueAtPercentile(99.99)),
throughputFormat.format(agg.publishDelayLatency.getMaxValue()));

result.aggregatedPublishLatencyAvg = agg.publishLatency.getMean() / 1000.0;
result.aggregatedPublishLatency50pct = agg.publishLatency.getValueAtPercentile(50) / 1000.0;
Expand All @@ -468,6 +490,15 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
result.aggregatedPublishLatency9999pct = agg.publishLatency.getValueAtPercentile(99.99) / 1000.0;
result.aggregatedPublishLatencyMax = agg.publishLatency.getMaxValue() / 1000.0;

result.aggregatedPublishDelayLatencyAvg = agg.publishDelayLatency.getMean();
result.aggregatedPublishDelayLatency50pct = agg.publishDelayLatency.getValueAtPercentile(50);
result.aggregatedPublishDelayLatency75pct = agg.publishDelayLatency.getValueAtPercentile(75);
result.aggregatedPublishDelayLatency95pct = agg.publishDelayLatency.getValueAtPercentile(95);
result.aggregatedPublishDelayLatency99pct = agg.publishDelayLatency.getValueAtPercentile(99);
result.aggregatedPublishDelayLatency999pct = agg.publishDelayLatency.getValueAtPercentile(99.9);
result.aggregatedPublishDelayLatency9999pct = agg.publishDelayLatency.getValueAtPercentile(99.99);
result.aggregatedPublishDelayLatencyMax = agg.publishDelayLatency.getMaxValue();

result.aggregatedEndToEndLatencyAvg = agg.endToEndLatency.getMean() / 1000.0;
result.aggregatedEndToEndLatency50pct = agg.endToEndLatency.getValueAtPercentile(50) / 1000.0;
result.aggregatedEndToEndLatency75pct = agg.endToEndLatency.getValueAtPercentile(75) / 1000.0;
Expand All @@ -482,6 +513,11 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
value.getValueIteratedTo() / 1000.0);
});

agg.publishDelayLatency.percentiles(100).forEach(value -> {
result.aggregatedPublishDelayLatencyQuantiles.put(value.getPercentile(),
value.getValueIteratedTo());
});

agg.endToEndLatency.percentiles(100).forEach(value -> {
result.aggregatedEndToEndLatencyQuantiles.put(value.getPercentile(),
microsToMillis(value.getValueIteratedTo()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.utils;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.LockSupport;

/**
* Provides a next operation time for rate limited operation streams.<br>
* The rate limiter is thread safe and can be shared by all threads.
*/
public final class UniformRateLimiter {

private static final AtomicLongFieldUpdater<UniformRateLimiter> V_TIME_UPDATER =
AtomicLongFieldUpdater.newUpdater(UniformRateLimiter.class, "virtualTime");
private static final AtomicLongFieldUpdater<UniformRateLimiter> START_UPDATER =
AtomicLongFieldUpdater.newUpdater(UniformRateLimiter.class, "start");
private static final double ONE_SEC_IN_NS = TimeUnit.SECONDS.toNanos(1);
private volatile long start = Long.MIN_VALUE;
private volatile long virtualTime;
private final double opsPerSec;
private final long intervalNs;

public UniformRateLimiter(final double opsPerSec) {
if (Double.isNaN(opsPerSec) || Double.isInfinite(opsPerSec)) {
throw new IllegalArgumentException("opsPerSec cannot be Nan or Infinite");
}
if (opsPerSec <= 0) {
throw new IllegalArgumentException("opsPerSec must be greater then 0");
}
this.opsPerSec = opsPerSec;
intervalNs = Math.round(ONE_SEC_IN_NS / opsPerSec);
}

public double getOpsPerSec() {
return opsPerSec;
}

public long getIntervalNs() {
return intervalNs;
}

public long acquire() {
final long currOpIndex = V_TIME_UPDATER.getAndIncrement(this);
long start = this.start;
if (start == Long.MIN_VALUE) {
start = System.nanoTime();
if (!START_UPDATER.compareAndSet(this, Long.MIN_VALUE, start)) {
start = this.start;
assert start != Long.MIN_VALUE;
}
}
return start + currOpIndex * intervalNs;
}

public static void uninterruptibleSleepNs(final long intendedTime) {
long sleepNs;
while ((sleepNs = (intendedTime - System.nanoTime())) > 0) {
LockSupport.parkNanos(sleepNs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ public PeriodStats getPeriodStats() {
stats.publishLatency.add(Histogram.decodeFromCompressedByteBuffer(
ByteBuffer.wrap(is.publishLatencyBytes), TimeUnit.SECONDS.toMicros(30)));

stats.publishDelayLatency.add(Histogram.decodeFromCompressedByteBuffer(
ByteBuffer.wrap(is.publishDelayLatencyBytes), TimeUnit.SECONDS.toMicros(30)));

stats.endToEndLatency.add(Histogram.decodeFromCompressedByteBuffer(
ByteBuffer.wrap(is.endToEndLatencyBytes), TimeUnit.HOURS.toMicros(12)));
} catch (ArrayIndexOutOfBoundsException | DataFormatException e) {
Expand All @@ -225,6 +228,15 @@ public CumulativeLatencies getCumulativeLatencies() {
throw new RuntimeException(e);
}

try {
stats.publishDelayLatency.add(Histogram.decodeFromCompressedByteBuffer(
ByteBuffer.wrap(is.publishDelayLatencyBytes), TimeUnit.SECONDS.toMicros(30)));
} catch (Exception e) {
log.error("Failed to decode publish delay latency: {}",
ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(is.publishDelayLatencyBytes)));
throw new RuntimeException(e);
}

try {
stats.endToEndLatency.add(Histogram.decodeFromCompressedByteBuffer(
ByteBuffer.wrap(is.endToEndLatencyBytes), TimeUnit.HOURS.toMicros(12)));
Expand Down
Loading

0 comments on commit 64eff25

Please sign in to comment.