From 787eef37843e3ea0972d064eb567d0639a8aba5f Mon Sep 17 00:00:00 2001 From: Jens Date: Tue, 18 Jan 2022 22:24:29 +0100 Subject: [PATCH] Improve CKMSQuantiles and address memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CKMSQuantiles is copied from an implementation of 2012, where it states that a ‘HACK’ was done, admitting a space leak. This leak has been noticed several times (#422, #550, #654). By correctly applying the algorithm from the paper we fix the leak. I have added unit-tests to show that the behaviour is correct. I have also added a Benchmark in the benchmark module showing the difference with the old and current implementation. According to my benchmarks, is in the new implementation a `get` of a quantile that has ‘seen’ 1 million elements 440 times faster. Inserting 1 million elements is 3.5 times faster. While going through the CKMS paper and the Java implementation I have added remarks and snippets from the paper, to clarify why certain choices are made. Signed-off-by: Jens Fix assertion in HistogramTest Median of 1..11 = 6 Signed-off-by: Jens Address PR remarks Signed-off-by: Jens --- benchmarks/pom.xml | 4 +- .../client/CKMSQuantileBenchmark.java | 108 ++++++ simpleclient/pom.xml | 6 + .../io/prometheus/client/CKMSQuantiles.java | 358 +++++++++++++----- .../prometheus/client/CKMSQuantilesTest.java | 196 ++++++++++ .../client/exporter/common/ExemplarTest.java | 12 +- 6 files changed, 576 insertions(+), 108 deletions(-) create mode 100644 benchmarks/src/main/java/io/prometheus/client/CKMSQuantileBenchmark.java create mode 100644 simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index e027baf8d..4194ddf4e 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -27,12 +27,12 @@ org.openjdk.jmh jmh-core - 1.3.2 + 1.34 org.openjdk.jmh jmh-generator-annprocess - 1.3.2 + 1.34 javax.annotation diff --git a/benchmarks/src/main/java/io/prometheus/client/CKMSQuantileBenchmark.java b/benchmarks/src/main/java/io/prometheus/client/CKMSQuantileBenchmark.java new file mode 100644 index 000000000..0f36212b5 --- /dev/null +++ b/benchmarks/src/main/java/io/prometheus/client/CKMSQuantileBenchmark.java @@ -0,0 +1,108 @@ +package io.prometheus.client; + +import io.prometheus.client.CKMSQuantiles.Quantile; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class CKMSQuantileBenchmark { + + @State(Scope.Benchmark) + public static class EmptyBenchmarkState { + @Param({"10000", "100000", "1000000"}) + public int value; + + CKMSQuantiles ckmsQuantiles; + + List quantiles; + Random rand = new Random(0); + + long[] shuffle; + + @Setup(Level.Trial) + public void setup() { + quantiles = new ArrayList(); + quantiles.add(new Quantile(0.50, 0.050)); + quantiles.add(new Quantile(0.90, 0.010)); + quantiles.add(new Quantile(0.95, 0.005)); + quantiles.add(new Quantile(0.99, 0.001)); + + + shuffle = new long[value]; + for (int i = 0; i < shuffle.length; i++) { + shuffle[i] = i; + } + Collections.shuffle(Arrays.asList(shuffle), rand); + + } + } + + @Benchmark + @BenchmarkMode({Mode.AverageTime}) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void ckmsQuantileInsertBenchmark(Blackhole blackhole, EmptyBenchmarkState state) { + CKMSQuantiles q = new CKMSQuantiles(state.quantiles.toArray(new Quantile[]{})); + for (long l : state.shuffle) { + q.insert(l); + } + } + + @State(Scope.Benchmark) + public static class PrefilledBenchmarkState { + public int value = 1000000; + + CKMSQuantiles ckmsQuantiles; + + List quantiles; + Random rand = new Random(0); + + long[] shuffle; + + @Setup(Level.Trial) + public void setup() { + quantiles = new ArrayList(); + quantiles.add(new Quantile(0.50, 0.050)); + quantiles.add(new Quantile(0.90, 0.010)); + quantiles.add(new Quantile(0.95, 0.005)); + quantiles.add(new Quantile(0.99, 0.001)); + + + shuffle = new long[value]; + for (int i = 0; i < shuffle.length; i++) { + shuffle[i] = i; + } + Collections.shuffle(Arrays.asList(shuffle), rand); + ckmsQuantiles = new CKMSQuantiles(quantiles.toArray(new Quantile[]{})); + for (long l : shuffle) { + ckmsQuantiles.insert(l); + } + + } + } + + @Benchmark + @BenchmarkMode({Mode.AverageTime}) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public void ckmsQuantileGetBenchmark(Blackhole blackhole, PrefilledBenchmarkState state) { + blackhole.consume(state.ckmsQuantiles.get(0.95)); + } + + public static void main(String[] args) throws RunnerException { + + Options opt = new OptionsBuilder() + .include(CKMSQuantileBenchmark.class.getSimpleName()) + .warmupIterations(5) + .measurementIterations(4) + .threads(1) + .forks(1) + .build(); + + new Runner(opt).run(); + } +} diff --git a/simpleclient/pom.xml b/simpleclient/pom.xml index d1a751f5e..b2d8f9ac7 100644 --- a/simpleclient/pom.xml +++ b/simpleclient/pom.xml @@ -54,5 +54,11 @@ 4.13.2 test + + org.apache.commons + commons-math3 + 3.2 + test + diff --git a/simpleclient/src/main/java/io/prometheus/client/CKMSQuantiles.java b/simpleclient/src/main/java/io/prometheus/client/CKMSQuantiles.java index 1ffb65382..7fe6cff3b 100644 --- a/simpleclient/src/main/java/io/prometheus/client/CKMSQuantiles.java +++ b/simpleclient/src/main/java/io/prometheus/client/CKMSQuantiles.java @@ -2,6 +2,7 @@ // Copied from https://raw.githubusercontent.com/Netflix/ocelli/master/ocelli-core/src/main/java/netflix/ocelli/stats/CKMSQuantiles.java // Revision d0357b8bf5c17a173ce94d6b26823775b3f999f6 from Jan 21, 2015. +// Which was copied from https://github.com/umbrant/QuantileEstimation/blob/34ea6889570827a79b3f8294812d9390af23b734/src/main/java/com/umbrant/quantile/QuantileEstimationCKMS.java // // This is the original code except for the following modifications: // @@ -35,129 +36,202 @@ * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm * for streaming calculation of targeted high-percentile epsilon-approximate * quantiles. - * + *

* This is a generalization of the earlier work by Greenwald and Khanna (GK), * which essentially allows different error bounds on the targeted quantiles, * which allows for far more efficient calculation of high-percentiles. - * - * + *

+ *

* See: Cormode, Korn, Muthukrishnan, and Srivastava * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 - * + *

* Greenwald and Khanna, * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 - * + *

+ *

+ * The Approximate Quantiles Algorithm looks like this (Figure 1) + *

+ * Main()
+ *    foreach item v do
+ *      Insert(v);
+ *      if (Compress Condition()) then
+ *          Compress();
+ *
+ * Insert(v):
+ *    r_0:=0;
+ *    for i:=1 to s do
+ *        ri := ri−1 + gi−1;
+ *        if (v < vi) break;
+ *        add (v,1,f(ri,n)−1) to S before vi;
+ *    n++;
+ *
+ * Compress():
+ *    for i := (s−1) downto 1 do
+ *        if (gi + gi+1 + ∆i+1 ≤ f(ri, n)) then
+ *            merge ti and ti+1;
+ *
+ * Output(φ):
+ *    r0:=0;
+ *    for i := 1 to s do
+ *        ri:=ri−1+gi−1;
+ *        if(ri +gi +∆i >φn+f(φn,n)/2)
+ *            print(vi−1); break;
+ * 
*/ -class CKMSQuantiles { +final class CKMSQuantiles { /** * Total number of items in stream. + * Increases on every insertBatch(). */ private int count = 0; /** - * Used for tracking incremental compression. + * Current list of sampled items, maintained in sorted order with error + * bounds. + *

+ * Note: Any algorithm that guarantees to find biased + * quantiles φ with error at most φεn in rank must store + * Ω(1 min{klog1/φ,log(εn)}) items. + *

+ */ + final LinkedList samples; + + /** + * Used for compress condition. + * This is a different index than the bufferCount, + * because flushing is also done on `quantile.get()`, + * but we need to compress, regardless of reaching the bufferCount, to limit space usage. */ private int compressIdx = 0; /** - * Current list of sampled items, maintained in sorted order with error - * bounds. + * The amount of values observed when to compress. */ - protected LinkedList sample; + private final int insertThreshold; /** * Buffers incoming items to be inserted in batch. */ - private double[] buffer = new double[500]; + private final double[] buffer; - private int bufferCount = 0; + /** + * Tracks the current index of the buffer. Increases on insert(). + */ + private int bufferIdx = 0; /** * Array of Quantiles that we care about, along with desired error. */ - private final Quantile quantiles[]; + private final Quantile[] quantiles; - public CKMSQuantiles(Quantile[] quantiles) { - this.quantiles = quantiles; - this.sample = new LinkedList(); + /** + * Set up the CKMS Quantiles. Can have 0 or more targeted quantiles defined. + * @param quantiles The targeted quantiles, can be empty. + */ + CKMSQuantiles(Quantile[] quantiles) { + // hard-coded epsilon of 0.1% to determine the batch size, and default epsilon in case of empty quantiles + double pointOnePercent = 0.001; + if (quantiles.length == 0) { // we need at least one for this algorithm to work + this.quantiles = new Quantile[1]; + this.quantiles[0] = new Quantile(0.5, pointOnePercent / 2); + } else { + this.quantiles = quantiles; + } + + // section 5.1 Methods - Batch. + // This is hardcoded to 500, which corresponds to an epsilon of 0.1%. + this.insertThreshold = 500; + + // create a buffer with size equal to threshold + this.buffer = new double[insertThreshold]; + + // Initialize empty items + this.samples = new LinkedList(); } /** * Add a new value from the stream. - * - * @param value + * + * @param value the observed value */ public void insert(double value) { - buffer[bufferCount] = value; - bufferCount++; + buffer[bufferIdx] = value; + bufferIdx++; + + if (bufferIdx == buffer.length) { + insertBatch(); // this is the batch insert variation + } - if (bufferCount == buffer.length) { - insertBatch(); + // The Compress_Condition() + compressIdx = (compressIdx + 1) % insertThreshold; + if (compressIdx == 0) { compress(); } } /** * Get the estimated value at the specified quantile. - * - * @param q - * Queried quantile, e.g. 0.50 or 0.99. + * + * @param q Queried quantile, e.g. 0.50 or 0.99. * @return Estimated value at that quantile. */ public double get(double q) { - // clear the buffer + // clear the buffer. in case of low value insertions, the samples can become stale. + // On every get, make sure we get the latest values in. insertBatch(); - compress(); - if (sample.size() == 0) { + if (samples.size() == 0) { return Double.NaN; } - int rankMin = 0; - int desired = (int) (q * count); + // Straightforward implementation of Output(q). + // Paper Section 3.1 on true rank: let r_i = Sum_{j=1}^{i−1} g_j + int currentRank = 0; + double desired = q * count; - ListIterator it = sample.listIterator(); + ListIterator it = samples.listIterator(); Item prev, cur; cur = it.next(); while (it.hasNext()) { prev = cur; cur = it.next(); - rankMin += prev.g; + currentRank += prev.g; - if (rankMin + cur.g + cur.delta > desired + if (currentRank + cur.g + cur.delta > desired + (allowableError(desired) / 2)) { return prev.value; } } // edge case of wanting max value - return sample.getLast().value; + return samples.getLast().value; } /** * Specifies the allowable error for this rank, depending on which quantiles * are being targeted. - * + *

* This is the f(r_i, n) function from the CKMS paper. It's basically how * wide the range of this rank can be. - * - * @param rank - * the index in the list of samples + *

+ * Define invariant function f (ri , n) as + * (i) f_j(r_i,n) = 2ε_j r_i / φ_j for φ_j n ≤ r_i ≤ n; + * (ii) f_j(r_i,n) = 2ε_j(n−r_i) / (1−φ_j) for 0 ≤ r_i ≤ φ_j n + *

+ * and take f(ri,n) = max{min_j ⌊f_j(r_i,n)⌋,1}. + * As before we ensure that for all i, g_i + ∆_i ≤ f(r_i, n). + * + * @param rank the index in the list of samples */ - private double allowableError(int rank) { - // NOTE: according to CKMS, this should be count, not size, but this - // leads - // to error larger than the error bounds. Leaving it like this is - // essentially a HACK, and blows up memory, but does "work". - // int size = count; - int size = sample.size(); - double minError = size + 1; + private double allowableError(double rank /* r_i */) { + int n = count; + double minError = count; for (Quantile q : quantiles) { double error; - if (rank <= q.quantile * size) { - error = q.u * (size - rank); + if (rank <= q.quantile * n) { + error = q.u * (n - rank); } else { error = q.v * rank; } @@ -165,103 +239,155 @@ private double allowableError(int rank) { minError = error; } } - - return minError; + return Math.max(minError, 1); } - private boolean insertBatch() { - if (bufferCount == 0) { - return false; + /** + * To insert a new item, v, we find i such that vi < v ≤ vi+1, + * we compute ri and insert the tuple (v,g=1,∆=f(ri,n)−1). + * + * We also ensure that min and max are kept exactly, so when v < v1, + * we insert the tuple (v,g = 1,∆ = 0) before v1. Similarly, when v > vs, + * we insert (v,g = 1,∆ = 0) after vs. + */ + private void insertBatch() { + if (bufferIdx == 0) { + return; } + // Has to be sorted: O(buffer) + // Since the buffer is treated as a circular buffer, we sort till the bufferIdx to prevent insertion of duplicate / already inserted values. + Arrays.sort(buffer, 0, bufferIdx); - Arrays.sort(buffer, 0, bufferCount); - - // Base case: no samples + // Base case: no samples yet int start = 0; - if (sample.size() == 0) { - Item newItem = new Item(buffer[0], 1, 0); - sample.add(newItem); + if (samples.size() == 0) { + Item newItem = new Item(buffer[0], 0); + samples.add(newItem); start++; count++; } - ListIterator it = sample.listIterator(); + // To insert a new item, v, we find i such that vi < v ≤ vi+1, + ListIterator it = samples.listIterator(); Item item = it.next(); - for (int i = start; i < bufferCount; i++) { + // Keep track of the current rank by adding the g of each item. See also discussion in https://issues.apache.org/jira/browse/HBASE-14324 + // Paper Section 3.1 on true rank: let r_i = Sum_{j=1}^{i−1} g_j + int currentRank = item.g; + + for (int i = start; i < bufferIdx; i++) { + // item to be inserted double v = buffer[i]; - while (it.nextIndex() < sample.size() && item.value < v) { + // find the item in the samples that is bigger than our v. + while (it.hasNext() && item.value < v) { item = it.next(); + currentRank += item.g; } // If we found that bigger item, back up so we insert ourselves // before it if (item.value > v) { + currentRank -= item.g; it.previous(); } // We use different indexes for the edge comparisons, because of the - // above - // if statement that adjusts the iterator + // above if statement that adjusts the iterator int delta; - if (it.previousIndex() == 0 || it.nextIndex() == sample.size()) { + if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { delta = 0; - } - else { - delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; + } else { + delta = ((int) Math.floor(allowableError(currentRank))) - 1; } - Item newItem = new Item(v, 1, delta); + Item newItem = new Item(v, delta); it.add(newItem); count++; item = newItem; } - bufferCount = 0; - return true; + // reset buffered items to 0. + bufferIdx = 0; } /** * Try to remove extraneous items from the set of sampled items. This checks * if an item is unnecessary based on the desired error bounds, and merges * it with the adjacent item if it is. + *

+ * Compress. Periodically, the algorithm scans the data + * structure and merges adjacent nodes when this does not + * violate the invariant. That is, remove nodes (vi, gi, ∆i) + * and (vi+1 , gi+1 , ∆i+1 ), and replace with (vi+1 , (gi + + * gi+1 ), ∆i+1 ) provided that (gi + gi+1 + ∆i+1 ) ≤ f (ri , n). + * This also maintains the semantics of g and ∆ being the + * difference in rank between v_i and v_{i-1} , and the difference + * between the highest and lowest possible ranks of vi, respectively. */ private void compress() { - if (sample.size() < 2) { + // If there are 0,1, or 2 samples then there's nothing to compress. + if (samples.size() < 3) { return; } - ListIterator it = sample.listIterator(); - int removed = 0; + ListIterator it = samples.listIterator(); - Item prev = null; + Item prev; Item next = it.next(); + // Counter for the rank in the stream of all observed values. + // Paper Section 3.1 on true rank: let r_i = Sum_{j=1}^{i−1} g_j + int currentRank = next.g; + while (it.hasNext()) { prev = next; next = it.next(); - - if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { + currentRank += next.g; + if (prev.g + next.g + next.delta <= allowableError(currentRank)) { next.g += prev.g; // Remove prev. it.remove() kills the last thing returned. - it.previous(); - it.previous(); - it.remove(); - // it.next() is now equal to next, skip it back forward again - it.next(); - removed++; + it.previous(); // brings pointer back to 'next' + it.previous(); // brings pointer back to 'prev' + it.remove(); // remove prev + // it.next() is now equal to next, + it.next(); // set pointer to 'next' } } } - private class Item { - public final double value; - public int g; - public final int delta; - - public Item(double value, int lower_delta, int delta) { + /** + * As in GK, the data structure at time n, S(n), consists + * of a sequence of s tuples ⟨ti = (vi, gi, ∆i)⟩, where each vi + * is a sampled item from the data stream and two additional + * values are kept: (1) g_i is the difference between the lowest + * possible rank of item i and the lowest possible rank of item + * i − 1; and (2) ∆_i is the difference between the greatest + * possible rank of item i and the lowest possible rank of item + * i. + */ + private static class Item { + /** + * vi + * is a sampled item from the data stream and two additional + * values are kept + */ + final double value; + /** + * g_i is the difference between the lowest + * possible rank of item i and the lowest possible rank of item + * i − 1. + * note: Always starts with 1, changes when merging Items. + */ + int g = 1; + /** + * ∆i is the difference between the greatest + * possible rank of item i and the lowest possible rank of item + * i. + */ + final int delta; + + Item(double value, int delta) { this.value = value; - this.g = lower_delta; this.delta = delta; } @@ -271,22 +397,54 @@ public String toString() { } } - public static class Quantile { - public final double quantile; - public final double error; - public final double u; - public final double v; + /** + * + */ + static class Quantile { + /** + * 0 < φ < 1 + */ + final double quantile; + /** + * Allowed error 0 < ε < 1 + */ + final double epsilon; + /** + * Helper value to calculate the targeted quantiles invariant as per Definition 5 (ii) + */ + final double u; + /** + * Helper value to calculate the targeted quantiles invariant as per Definition 5 (i) + */ + final double v; + + /** + * Targeted quantile: T = {(φ_j , ε_j )} + * Rather than requesting the same ε for all quantiles (the uniform case) + * or ε scaled by φ (the biased case), one might specify an arbitrary set + * of quantiles and the desired errors of ε for each in the form (φj , εj ). + * For example, input to the targeted quantiles problem might be {(0.5, 0.1), (0.2, 0.05), (0.9, 0.01)}, + * meaning that the median should be returned with 10% error, the 20th percentile with 5% error, + * and the 90th percentile with 1%. + * + * @param quantile the quantile between 0 and 1 + * @param epsilon the desired error for this quantile, between 0 and 1. + */ + Quantile(double quantile, double epsilon) { + if (quantile < 0 || quantile > 1.0) throw new IllegalArgumentException("Quantile must be between 0 and 1"); + if (epsilon < 0 || epsilon > 1.0) throw new IllegalArgumentException("Epsilon must be between 0 and 1"); - public Quantile(double quantile, double error) { this.quantile = quantile; - this.error = error; - u = 2.0 * error / (1.0 - quantile); - v = 2.0 * error / quantile; + this.epsilon = epsilon; + // f_j(r_i,n) = 2ε_j(n−r_i) / (1−φ_j) for 0 ≤ r_i ≤ φ_j n + u = 2.0 * epsilon / (1.0 - quantile); + // f_j(r_i,n) = 2ε_j r_i / φ_j for φ_j n ≤ r_i ≤ n; + v = 2.0 * epsilon / quantile; } @Override public String toString() { - return String.format("Q{q=%.3f, eps=%.3f}", quantile, error); + return String.format("Q{q=%.3f, eps=%.3f}", quantile, epsilon); } } diff --git a/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java b/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java new file mode 100644 index 000000000..63a281007 --- /dev/null +++ b/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java @@ -0,0 +1,196 @@ +package io.prometheus.client; + +import io.prometheus.client.CKMSQuantiles.Quantile; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.commons.math3.random.RandomGenerator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; + +public class CKMSQuantilesTest { + + @Test + public void testGetOnEmptyValues() { + List quantiles = new ArrayList(); + quantiles.add(new Quantile(0.50, 0.01)); + quantiles.add(new Quantile(0.90, 0.01)); + quantiles.add(new Quantile(0.95, 0.01)); + quantiles.add(new Quantile(0.99, 0.01)); + + CKMSQuantiles ckms = new CKMSQuantiles( + quantiles.toArray(new Quantile[]{})); + assertEquals(Double.NaN, ckms.get(0), 0); + } + + @Test + public void testGetWhenNoQuantilesAreDefined() { + CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{}); + assertEquals(Double.NaN, ckms.get(0), 0); + } + + @Test + public void testInsertWhenNoQuantilesAreDefined() { + CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{}); + ckms.insert(1.0); + ckms.insert(2.0); + ckms.insert(3.0); + assertEquals(1.0, ckms.get(0), 0); + assertEquals(2.0, ckms.get(0.5), 0); + assertEquals(3.0, ckms.get(1), 0); + } + + @Test + public void testCompressWhenBufferSize500Reached() { + CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{}); + List input = makeSequence(1, 499); + + for (double v : input) { + ckms.insert(v); + } + assertEquals("No compress should be triggered", 0, ckms.samples.size()); + + ckms.insert(500); + assertEquals(500, ckms.samples.size()); + } + + @Test + public void testGet() { + List quantiles = new ArrayList(); + quantiles.add(new Quantile(0.50, 0.01)); + quantiles.add(new Quantile(0.90, 0.01)); + quantiles.add(new Quantile(0.95, 0.01)); + quantiles.add(new Quantile(0.99, 0.01)); + + List input = makeSequence(1, 100); + CKMSQuantiles ckms = new CKMSQuantiles( + quantiles.toArray(new Quantile[]{})); + for (double v : input) { + ckms.insert(v); + } + assertEquals(10.0, ckms.get(0.1), 1); + assertEquals(50.0, ckms.get(0.5), 1); + assertEquals(90.0, ckms.get(0.9), 1); + assertEquals(95.0, ckms.get(0.95), 1); + assertEquals(99.0, ckms.get(0.99), 1); + } + + @Test + public void testGetWithAMillionElements() { + List quantiles = new ArrayList(); + quantiles.add(new Quantile(0.0, 0.01)); + quantiles.add(new Quantile(0.10, 0.01)); + quantiles.add(new Quantile(0.90, 0.001)); + quantiles.add(new Quantile(0.95, 0.02)); + quantiles.add(new Quantile(0.99, 0.001)); + + final int elemCount = 1000000; + double[] shuffle = new double[elemCount]; + for (int i = 0; i < shuffle.length; i++) { + shuffle[i] = i + 1; + } + Random rand = new Random(0); + + Collections.shuffle(Arrays.asList(shuffle), rand); + + CKMSQuantiles ckms = new CKMSQuantiles( + quantiles.toArray(new Quantile[]{})); + + for (double v : shuffle) { + ckms.insert(v); + } + // given the linear distribution, we set the delta equal to the εn value for this quantile + assertEquals(0.1 * elemCount, ckms.get(0.1), 0.01 * elemCount); + assertEquals(0.9 * elemCount, ckms.get(0.9), 0.001 * elemCount); + assertEquals(0.95 * elemCount, ckms.get(0.95), 0.02 * elemCount); + assertEquals(0.99 * elemCount, ckms.get(0.99), 0.001 * elemCount); + + assertTrue("sample size should be way below 1_000_000", ckms.samples.size() < 1000); + } + + + @Test + public void testGetGaussian() { + RandomGenerator rand = new JDKRandomGenerator(); + rand.setSeed(0); + + double mean = 0.0; + double stddev = 1.0; + NormalDistribution normalDistribution = new NormalDistribution(rand, mean, stddev, NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + + List quantiles = new ArrayList(); + quantiles.add(new Quantile(0.10, 0.001)); + quantiles.add(new Quantile(0.50, 0.01)); + quantiles.add(new Quantile(0.90, 0.001)); + quantiles.add(new Quantile(0.95, 0.001)); + quantiles.add(new Quantile(0.99, 0.001)); + + CKMSQuantiles ckms = new CKMSQuantiles( + quantiles.toArray(new Quantile[]{})); + + final int elemCount = 1000000; + double[] shuffle = normalDistribution.sample(elemCount); + + // insert a million samples + for (double v : shuffle) { + ckms.insert(v); + } + + // give the actual values for the quantiles we test + double p10 = normalDistribution.inverseCumulativeProbability(0.1); + double p90 = normalDistribution.inverseCumulativeProbability(0.9); + double p95 = normalDistribution.inverseCumulativeProbability(0.95); + double p99 = normalDistribution.inverseCumulativeProbability(0.99); + + //ε-approximate quantiles relaxes the requirement + //to finding an item with rank between (φ−ε)n and (φ+ε)n. + assertEquals(p10, ckms.get(0.1), errorBoundsNormalDistribution(0.1, 0.001, normalDistribution)); + assertEquals(mean, ckms.get(0.5), errorBoundsNormalDistribution(0.5, 0.01, normalDistribution)); + assertEquals(p90, ckms.get(0.9), errorBoundsNormalDistribution(0.9, 0.001, normalDistribution)); + assertEquals(p95, ckms.get(0.95), errorBoundsNormalDistribution(0.95, 0.001, normalDistribution)); + assertEquals(p99, ckms.get(0.99), errorBoundsNormalDistribution(0.99, 0.001, normalDistribution)); + + assertTrue("sample size should be below 1000", ckms.samples.size() < 1000); + } + + @Test + public void checkBounds() { + try { + new Quantile(-1, 0); + } catch (IllegalArgumentException e) { + assertEquals("Quantile must be between 0 and 1", e.getMessage()); + } catch (Exception e) { + fail("Wrong exception thrown" + e); + } + + try { + new Quantile(0.95, 2); + } catch (IllegalArgumentException e) { + assertEquals("Epsilon must be between 0 and 1", e.getMessage()); + } catch (Exception e) { + fail("Wrong exception thrown" + e); + } + } + + double errorBoundsNormalDistribution(double p, double epsilon, NormalDistribution nd) { + //(φ+ε)n + double upperBound = nd.inverseCumulativeProbability(p + epsilon); + //(φ−ε)n + double lowerBound = nd.inverseCumulativeProbability(p - epsilon); + // subtract and divide by 2, assuming that the increase is linear in this small epsilon. + return Math.abs(upperBound - lowerBound) / 2; + } + + /** + * In Java 8 we could use IntStream + */ + List makeSequence(int begin, int end) { + List ret = new ArrayList(end - begin + 1); + for (int i = begin; i <= end; i++) { + ret.add((double) i); + } + return ret; + } +} diff --git a/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java b/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java index 469446cda..2ab2821f9 100644 --- a/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java +++ b/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java @@ -310,15 +310,15 @@ public void testSummaryNoLabels() throws IOException { .help("help") .quantile(0.5, 0.01) .register(registry); - for (int i=1; i<=11; i++) { // median is 5 + for (int i=1; i<=11; i++) { // median is 6 noLabelsDefaultExemplar.observe(i); } // Summaries don't have Exemplars according to the OpenMetrics spec. - assertOpenMetrics100Format("no_labels{quantile=\"0.5\"} 5.0\n"); + assertOpenMetrics100Format("no_labels{quantile=\"0.5\"} 6.0\n"); assertOpenMetrics100Format("no_labels_count 11.0\n"); assertOpenMetrics100Format("no_labels_sum 66.0\n"); - assert004Format("no_labels{quantile=\"0.5\",} 5.0\n"); + assert004Format("no_labels{quantile=\"0.5\",} 6.0\n"); assert004Format("no_labels_count 11.0\n"); assert004Format("no_labels_sum 66.0\n"); } @@ -331,15 +331,15 @@ public void testSummaryLabels() throws IOException { .labelNames("label") .quantile(0.5, 0.01) .register(registry); - for (int i=1; i<=11; i++) { // median is 5 + for (int i=1; i<=11; i++) { // median is 6 labelsNoExemplar.labels("test").observe(i); } // Summaries don't have Exemplars according to the OpenMetrics spec. - assertOpenMetrics100Format("labels{label=\"test\",quantile=\"0.5\"} 5.0\n"); + assertOpenMetrics100Format("labels{label=\"test\",quantile=\"0.5\"} 6.0\n"); assertOpenMetrics100Format("labels_count{label=\"test\"} 11.0\n"); assertOpenMetrics100Format("labels_sum{label=\"test\"} 66.0\n"); - assert004Format("labels{label=\"test\",quantile=\"0.5\",} 5.0\n"); + assert004Format("labels{label=\"test\",quantile=\"0.5\",} 6.0\n"); assert004Format("labels_count{label=\"test\",} 11.0\n"); assert004Format("labels_sum{label=\"test\",} 66.0\n"); }