- it = samples.listIterator();
Item prev, cur;
cur = it.next();
while (it.hasNext()) {
prev = cur;
cur = it.next();
- rankMin += prev.g;
+ currentRank += prev.g;
- if (rankMin + cur.g + cur.delta > desired
+ if (currentRank + cur.g + cur.delta > desired
+ (allowableError(desired) / 2)) {
return prev.value;
}
}
// edge case of wanting max value
- return sample.getLast().value;
+ return samples.getLast().value;
}
/**
* Specifies the allowable error for this rank, depending on which quantiles
* are being targeted.
- *
+ *
* This is the f(r_i, n) function from the CKMS paper. It's basically how
* wide the range of this rank can be.
- *
- * @param rank
- * the index in the list of samples
+ *
+ * Define invariant function f (ri , n) as
+ * (i) f_j(r_i,n) = 2ε_j r_i / φ_j for φ_j n ≤ r_i ≤ n;
+ * (ii) f_j(r_i,n) = 2ε_j(n−r_i) / (1−φ_j) for 0 ≤ r_i ≤ φ_j n
+ *
+ * and take f(ri,n) = max{min_j ⌊f_j(r_i,n)⌋,1}.
+ * As before we ensure that for all i, g_i + ∆_i ≤ f(r_i, n).
+ *
+ * @param rank the index in the list of samples
*/
- private double allowableError(int rank) {
- // NOTE: according to CKMS, this should be count, not size, but this
- // leads
- // to error larger than the error bounds. Leaving it like this is
- // essentially a HACK, and blows up memory, but does "work".
- // int size = count;
- int size = sample.size();
- double minError = size + 1;
+ private double allowableError(double rank /* r_i */) {
+ int n = count;
+ double minError = count;
for (Quantile q : quantiles) {
double error;
- if (rank <= q.quantile * size) {
- error = q.u * (size - rank);
+ if (rank <= q.quantile * n) {
+ error = q.u * (n - rank);
} else {
error = q.v * rank;
}
@@ -165,103 +239,155 @@ private double allowableError(int rank) {
minError = error;
}
}
-
- return minError;
+ return Math.max(minError, 1);
}
- private boolean insertBatch() {
- if (bufferCount == 0) {
- return false;
+ /**
+ * To insert a new item, v, we find i such that vi < v ≤ vi+1,
+ * we compute ri and insert the tuple (v,g=1,∆=f(ri,n)−1).
+ *
+ * We also ensure that min and max are kept exactly, so when v < v1,
+ * we insert the tuple (v,g = 1,∆ = 0) before v1. Similarly, when v > vs,
+ * we insert (v,g = 1,∆ = 0) after vs.
+ */
+ private void insertBatch() {
+ if (bufferIdx == 0) {
+ return;
}
+ // Has to be sorted: O(buffer)
+ // Since the buffer is treated as a circular buffer, we sort till the bufferIdx to prevent insertion of duplicate / already inserted values.
+ Arrays.sort(buffer, 0, bufferIdx);
- Arrays.sort(buffer, 0, bufferCount);
-
- // Base case: no samples
+ // Base case: no samples yet
int start = 0;
- if (sample.size() == 0) {
- Item newItem = new Item(buffer[0], 1, 0);
- sample.add(newItem);
+ if (samples.size() == 0) {
+ Item newItem = new Item(buffer[0], 0);
+ samples.add(newItem);
start++;
count++;
}
- ListIterator- it = sample.listIterator();
+ // To insert a new item, v, we find i such that vi < v ≤ vi+1,
+ ListIterator
- it = samples.listIterator();
Item item = it.next();
- for (int i = start; i < bufferCount; i++) {
+ // Keep track of the current rank by adding the g of each item. See also discussion in https://issues.apache.org/jira/browse/HBASE-14324
+ // Paper Section 3.1 on true rank: let r_i = Sum_{j=1}^{i−1} g_j
+ int currentRank = item.g;
+
+ for (int i = start; i < bufferIdx; i++) {
+ // item to be inserted
double v = buffer[i];
- while (it.nextIndex() < sample.size() && item.value < v) {
+ // find the item in the samples that is bigger than our v.
+ while (it.hasNext() && item.value < v) {
item = it.next();
+ currentRank += item.g;
}
// If we found that bigger item, back up so we insert ourselves
// before it
if (item.value > v) {
+ currentRank -= item.g;
it.previous();
}
// We use different indexes for the edge comparisons, because of the
- // above
- // if statement that adjusts the iterator
+ // above if statement that adjusts the iterator
int delta;
- if (it.previousIndex() == 0 || it.nextIndex() == sample.size()) {
+ if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
delta = 0;
- }
- else {
- delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
+ } else {
+ delta = ((int) Math.floor(allowableError(currentRank))) - 1;
}
- Item newItem = new Item(v, 1, delta);
+ Item newItem = new Item(v, delta);
it.add(newItem);
count++;
item = newItem;
}
- bufferCount = 0;
- return true;
+ // reset buffered items to 0.
+ bufferIdx = 0;
}
/**
* Try to remove extraneous items from the set of sampled items. This checks
* if an item is unnecessary based on the desired error bounds, and merges
* it with the adjacent item if it is.
+ *
+ * Compress. Periodically, the algorithm scans the data
+ * structure and merges adjacent nodes when this does not
+ * violate the invariant. That is, remove nodes (vi, gi, ∆i)
+ * and (vi+1 , gi+1 , ∆i+1 ), and replace with (vi+1 , (gi +
+ * gi+1 ), ∆i+1 ) provided that (gi + gi+1 + ∆i+1 ) ≤ f (ri , n).
+ * This also maintains the semantics of g and ∆ being the
+ * difference in rank between v_i and v_{i-1} , and the difference
+ * between the highest and lowest possible ranks of vi, respectively.
*/
private void compress() {
- if (sample.size() < 2) {
+ // If there are 0,1, or 2 samples then there's nothing to compress.
+ if (samples.size() < 3) {
return;
}
- ListIterator- it = sample.listIterator();
- int removed = 0;
+ ListIterator
- it = samples.listIterator();
- Item prev = null;
+ Item prev;
Item next = it.next();
+ // Counter for the rank in the stream of all observed values.
+ // Paper Section 3.1 on true rank: let r_i = Sum_{j=1}^{i−1} g_j
+ int currentRank = next.g;
+
while (it.hasNext()) {
prev = next;
next = it.next();
-
- if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
+ currentRank += next.g;
+ if (prev.g + next.g + next.delta <= allowableError(currentRank)) {
next.g += prev.g;
// Remove prev. it.remove() kills the last thing returned.
- it.previous();
- it.previous();
- it.remove();
- // it.next() is now equal to next, skip it back forward again
- it.next();
- removed++;
+ it.previous(); // brings pointer back to 'next'
+ it.previous(); // brings pointer back to 'prev'
+ it.remove(); // remove prev
+ // it.next() is now equal to next,
+ it.next(); // set pointer to 'next'
}
}
}
- private class Item {
- public final double value;
- public int g;
- public final int delta;
-
- public Item(double value, int lower_delta, int delta) {
+ /**
+ * As in GK, the data structure at time n, S(n), consists
+ * of a sequence of s tuples ⟨ti = (vi, gi, ∆i)⟩, where each vi
+ * is a sampled item from the data stream and two additional
+ * values are kept: (1) g_i is the difference between the lowest
+ * possible rank of item i and the lowest possible rank of item
+ * i − 1; and (2) ∆_i is the difference between the greatest
+ * possible rank of item i and the lowest possible rank of item
+ * i.
+ */
+ private static class Item {
+ /**
+ * vi
+ * is a sampled item from the data stream and two additional
+ * values are kept
+ */
+ final double value;
+ /**
+ * g_i is the difference between the lowest
+ * possible rank of item i and the lowest possible rank of item
+ * i − 1.
+ * note: Always starts with 1, changes when merging Items.
+ */
+ int g = 1;
+ /**
+ * ∆i is the difference between the greatest
+ * possible rank of item i and the lowest possible rank of item
+ * i.
+ */
+ final int delta;
+
+ Item(double value, int delta) {
this.value = value;
- this.g = lower_delta;
this.delta = delta;
}
@@ -271,22 +397,54 @@ public String toString() {
}
}
- public static class Quantile {
- public final double quantile;
- public final double error;
- public final double u;
- public final double v;
+ /**
+ *
+ */
+ static class Quantile {
+ /**
+ * 0 < φ < 1
+ */
+ final double quantile;
+ /**
+ * Allowed error 0 < ε < 1
+ */
+ final double epsilon;
+ /**
+ * Helper value to calculate the targeted quantiles invariant as per Definition 5 (ii)
+ */
+ final double u;
+ /**
+ * Helper value to calculate the targeted quantiles invariant as per Definition 5 (i)
+ */
+ final double v;
+
+ /**
+ * Targeted quantile: T = {(φ_j , ε_j )}
+ * Rather than requesting the same ε for all quantiles (the uniform case)
+ * or ε scaled by φ (the biased case), one might specify an arbitrary set
+ * of quantiles and the desired errors of ε for each in the form (φj , εj ).
+ * For example, input to the targeted quantiles problem might be {(0.5, 0.1), (0.2, 0.05), (0.9, 0.01)},
+ * meaning that the median should be returned with 10% error, the 20th percentile with 5% error,
+ * and the 90th percentile with 1%.
+ *
+ * @param quantile the quantile between 0 and 1
+ * @param epsilon the desired error for this quantile, between 0 and 1.
+ */
+ Quantile(double quantile, double epsilon) {
+ if (quantile < 0 || quantile > 1.0) throw new IllegalArgumentException("Quantile must be between 0 and 1");
+ if (epsilon < 0 || epsilon > 1.0) throw new IllegalArgumentException("Epsilon must be between 0 and 1");
- public Quantile(double quantile, double error) {
this.quantile = quantile;
- this.error = error;
- u = 2.0 * error / (1.0 - quantile);
- v = 2.0 * error / quantile;
+ this.epsilon = epsilon;
+ // f_j(r_i,n) = 2ε_j(n−r_i) / (1−φ_j) for 0 ≤ r_i ≤ φ_j n
+ u = 2.0 * epsilon / (1.0 - quantile);
+ // f_j(r_i,n) = 2ε_j r_i / φ_j for φ_j n ≤ r_i ≤ n;
+ v = 2.0 * epsilon / quantile;
}
@Override
public String toString() {
- return String.format("Q{q=%.3f, eps=%.3f}", quantile, error);
+ return String.format("Q{q=%.3f, eps=%.3f}", quantile, epsilon);
}
}
diff --git a/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java b/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java
new file mode 100644
index 000000000..63a281007
--- /dev/null
+++ b/simpleclient/src/test/java/io/prometheus/client/CKMSQuantilesTest.java
@@ -0,0 +1,196 @@
+package io.prometheus.client;
+
+import io.prometheus.client.CKMSQuantiles.Quantile;
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+public class CKMSQuantilesTest {
+
+ @Test
+ public void testGetOnEmptyValues() {
+ List quantiles = new ArrayList();
+ quantiles.add(new Quantile(0.50, 0.01));
+ quantiles.add(new Quantile(0.90, 0.01));
+ quantiles.add(new Quantile(0.95, 0.01));
+ quantiles.add(new Quantile(0.99, 0.01));
+
+ CKMSQuantiles ckms = new CKMSQuantiles(
+ quantiles.toArray(new Quantile[]{}));
+ assertEquals(Double.NaN, ckms.get(0), 0);
+ }
+
+ @Test
+ public void testGetWhenNoQuantilesAreDefined() {
+ CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{});
+ assertEquals(Double.NaN, ckms.get(0), 0);
+ }
+
+ @Test
+ public void testInsertWhenNoQuantilesAreDefined() {
+ CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{});
+ ckms.insert(1.0);
+ ckms.insert(2.0);
+ ckms.insert(3.0);
+ assertEquals(1.0, ckms.get(0), 0);
+ assertEquals(2.0, ckms.get(0.5), 0);
+ assertEquals(3.0, ckms.get(1), 0);
+ }
+
+ @Test
+ public void testCompressWhenBufferSize500Reached() {
+ CKMSQuantiles ckms = new CKMSQuantiles(new Quantile[]{});
+ List input = makeSequence(1, 499);
+
+ for (double v : input) {
+ ckms.insert(v);
+ }
+ assertEquals("No compress should be triggered", 0, ckms.samples.size());
+
+ ckms.insert(500);
+ assertEquals(500, ckms.samples.size());
+ }
+
+ @Test
+ public void testGet() {
+ List quantiles = new ArrayList();
+ quantiles.add(new Quantile(0.50, 0.01));
+ quantiles.add(new Quantile(0.90, 0.01));
+ quantiles.add(new Quantile(0.95, 0.01));
+ quantiles.add(new Quantile(0.99, 0.01));
+
+ List input = makeSequence(1, 100);
+ CKMSQuantiles ckms = new CKMSQuantiles(
+ quantiles.toArray(new Quantile[]{}));
+ for (double v : input) {
+ ckms.insert(v);
+ }
+ assertEquals(10.0, ckms.get(0.1), 1);
+ assertEquals(50.0, ckms.get(0.5), 1);
+ assertEquals(90.0, ckms.get(0.9), 1);
+ assertEquals(95.0, ckms.get(0.95), 1);
+ assertEquals(99.0, ckms.get(0.99), 1);
+ }
+
+ @Test
+ public void testGetWithAMillionElements() {
+ List quantiles = new ArrayList();
+ quantiles.add(new Quantile(0.0, 0.01));
+ quantiles.add(new Quantile(0.10, 0.01));
+ quantiles.add(new Quantile(0.90, 0.001));
+ quantiles.add(new Quantile(0.95, 0.02));
+ quantiles.add(new Quantile(0.99, 0.001));
+
+ final int elemCount = 1000000;
+ double[] shuffle = new double[elemCount];
+ for (int i = 0; i < shuffle.length; i++) {
+ shuffle[i] = i + 1;
+ }
+ Random rand = new Random(0);
+
+ Collections.shuffle(Arrays.asList(shuffle), rand);
+
+ CKMSQuantiles ckms = new CKMSQuantiles(
+ quantiles.toArray(new Quantile[]{}));
+
+ for (double v : shuffle) {
+ ckms.insert(v);
+ }
+ // given the linear distribution, we set the delta equal to the εn value for this quantile
+ assertEquals(0.1 * elemCount, ckms.get(0.1), 0.01 * elemCount);
+ assertEquals(0.9 * elemCount, ckms.get(0.9), 0.001 * elemCount);
+ assertEquals(0.95 * elemCount, ckms.get(0.95), 0.02 * elemCount);
+ assertEquals(0.99 * elemCount, ckms.get(0.99), 0.001 * elemCount);
+
+ assertTrue("sample size should be way below 1_000_000", ckms.samples.size() < 1000);
+ }
+
+
+ @Test
+ public void testGetGaussian() {
+ RandomGenerator rand = new JDKRandomGenerator();
+ rand.setSeed(0);
+
+ double mean = 0.0;
+ double stddev = 1.0;
+ NormalDistribution normalDistribution = new NormalDistribution(rand, mean, stddev, NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
+
+ List quantiles = new ArrayList();
+ quantiles.add(new Quantile(0.10, 0.001));
+ quantiles.add(new Quantile(0.50, 0.01));
+ quantiles.add(new Quantile(0.90, 0.001));
+ quantiles.add(new Quantile(0.95, 0.001));
+ quantiles.add(new Quantile(0.99, 0.001));
+
+ CKMSQuantiles ckms = new CKMSQuantiles(
+ quantiles.toArray(new Quantile[]{}));
+
+ final int elemCount = 1000000;
+ double[] shuffle = normalDistribution.sample(elemCount);
+
+ // insert a million samples
+ for (double v : shuffle) {
+ ckms.insert(v);
+ }
+
+ // give the actual values for the quantiles we test
+ double p10 = normalDistribution.inverseCumulativeProbability(0.1);
+ double p90 = normalDistribution.inverseCumulativeProbability(0.9);
+ double p95 = normalDistribution.inverseCumulativeProbability(0.95);
+ double p99 = normalDistribution.inverseCumulativeProbability(0.99);
+
+ //ε-approximate quantiles relaxes the requirement
+ //to finding an item with rank between (φ−ε)n and (φ+ε)n.
+ assertEquals(p10, ckms.get(0.1), errorBoundsNormalDistribution(0.1, 0.001, normalDistribution));
+ assertEquals(mean, ckms.get(0.5), errorBoundsNormalDistribution(0.5, 0.01, normalDistribution));
+ assertEquals(p90, ckms.get(0.9), errorBoundsNormalDistribution(0.9, 0.001, normalDistribution));
+ assertEquals(p95, ckms.get(0.95), errorBoundsNormalDistribution(0.95, 0.001, normalDistribution));
+ assertEquals(p99, ckms.get(0.99), errorBoundsNormalDistribution(0.99, 0.001, normalDistribution));
+
+ assertTrue("sample size should be below 1000", ckms.samples.size() < 1000);
+ }
+
+ @Test
+ public void checkBounds() {
+ try {
+ new Quantile(-1, 0);
+ } catch (IllegalArgumentException e) {
+ assertEquals("Quantile must be between 0 and 1", e.getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception thrown" + e);
+ }
+
+ try {
+ new Quantile(0.95, 2);
+ } catch (IllegalArgumentException e) {
+ assertEquals("Epsilon must be between 0 and 1", e.getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception thrown" + e);
+ }
+ }
+
+ double errorBoundsNormalDistribution(double p, double epsilon, NormalDistribution nd) {
+ //(φ+ε)n
+ double upperBound = nd.inverseCumulativeProbability(p + epsilon);
+ //(φ−ε)n
+ double lowerBound = nd.inverseCumulativeProbability(p - epsilon);
+ // subtract and divide by 2, assuming that the increase is linear in this small epsilon.
+ return Math.abs(upperBound - lowerBound) / 2;
+ }
+
+ /**
+ * In Java 8 we could use IntStream
+ */
+ List makeSequence(int begin, int end) {
+ List ret = new ArrayList(end - begin + 1);
+ for (int i = begin; i <= end; i++) {
+ ret.add((double) i);
+ }
+ return ret;
+ }
+}
diff --git a/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java b/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java
index 469446cda..2ab2821f9 100644
--- a/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java
+++ b/simpleclient_common/src/test/java/io/prometheus/client/exporter/common/ExemplarTest.java
@@ -310,15 +310,15 @@ public void testSummaryNoLabels() throws IOException {
.help("help")
.quantile(0.5, 0.01)
.register(registry);
- for (int i=1; i<=11; i++) { // median is 5
+ for (int i=1; i<=11; i++) { // median is 6
noLabelsDefaultExemplar.observe(i);
}
// Summaries don't have Exemplars according to the OpenMetrics spec.
- assertOpenMetrics100Format("no_labels{quantile=\"0.5\"} 5.0\n");
+ assertOpenMetrics100Format("no_labels{quantile=\"0.5\"} 6.0\n");
assertOpenMetrics100Format("no_labels_count 11.0\n");
assertOpenMetrics100Format("no_labels_sum 66.0\n");
- assert004Format("no_labels{quantile=\"0.5\",} 5.0\n");
+ assert004Format("no_labels{quantile=\"0.5\",} 6.0\n");
assert004Format("no_labels_count 11.0\n");
assert004Format("no_labels_sum 66.0\n");
}
@@ -331,15 +331,15 @@ public void testSummaryLabels() throws IOException {
.labelNames("label")
.quantile(0.5, 0.01)
.register(registry);
- for (int i=1; i<=11; i++) { // median is 5
+ for (int i=1; i<=11; i++) { // median is 6
labelsNoExemplar.labels("test").observe(i);
}
// Summaries don't have Exemplars according to the OpenMetrics spec.
- assertOpenMetrics100Format("labels{label=\"test\",quantile=\"0.5\"} 5.0\n");
+ assertOpenMetrics100Format("labels{label=\"test\",quantile=\"0.5\"} 6.0\n");
assertOpenMetrics100Format("labels_count{label=\"test\"} 11.0\n");
assertOpenMetrics100Format("labels_sum{label=\"test\"} 66.0\n");
- assert004Format("labels{label=\"test\",quantile=\"0.5\",} 5.0\n");
+ assert004Format("labels{label=\"test\",quantile=\"0.5\",} 6.0\n");
assert004Format("labels_count{label=\"test\",} 11.0\n");
assert004Format("labels_sum{label=\"test\",} 66.0\n");
}