-
Notifications
You must be signed in to change notification settings - Fork 580
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
finagle-core: Ability to configure StatsFilter with a HistogramCounte…
…rFactory to track request burstiness Problem While we have a counter for the number of requests, we're limited by the frequency of metrics collection to see how these requests are spread out; that is, how "bursty" they are. Solution Introduce a HistogramCounter, created via a HistogramCounterFactory, that can be used to track request burstiness. The factory is configured on a client/server, and, if configured, is used in StatsFilter to track request burstiness. Differential Revision: https://phabricator.twitter.biz/D1180751
- Loading branch information
Showing
1 changed file
with
106 additions
and
0 deletions.
There are no files selected for viewing
106 changes: 106 additions & 0 deletions
106
util-stats/src/main/scala/com/twitter/finagle/stats/HistogramCounter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package com.twitter.finagle.stats | ||
|
||
import com.twitter.conversions.DurationOps._ | ||
import com.twitter.util.Closable | ||
import com.twitter.util.Duration | ||
import com.twitter.util.Future | ||
import com.twitter.util.Time | ||
import com.twitter.util.Timer | ||
import java.util.concurrent.ConcurrentHashMap | ||
import java.util.concurrent.atomic.LongAdder | ||
import scala.collection.JavaConverters._ | ||
|
||
private[twitter] sealed abstract class StatsFrequency(val frequency: Duration) { | ||
def suffix: String | ||
} | ||
|
||
private[twitter] object StatsFrequency { | ||
case object HundredMilliSecondly extends StatsFrequency(100.millis) { | ||
override def suffix = "hundredMilliSecondly" | ||
} | ||
} | ||
|
||
/** | ||
* Class for creating [[HistogramCounter]]s. It is expected that there be one [[HistogramCounterFactory]] | ||
* per process -- otherwise we will schedule multiple timer tasks for aggregating the counter into | ||
* a stat, and there can be multiple aggregations for a single stat which may produce unexpected | ||
* results. | ||
*/ | ||
private[twitter] class HistogramCounterFactory(timer: Timer, nowMs: () => Long) extends Closable { | ||
|
||
@volatile private[this] var closed = false | ||
|
||
private[this] val frequencyToStats: Map[ | ||
StatsFrequency, | ||
ConcurrentHashMap[Stat, HistogramCounter] | ||
] = Map( | ||
StatsFrequency.HundredMilliSecondly -> new ConcurrentHashMap[Stat, HistogramCounter] | ||
) | ||
|
||
frequencyToStats.map { | ||
case (statsFrequency, statToCounter) => | ||
timer.doLater(statsFrequency.frequency)(recordStatsForCounters(statsFrequency, statToCounter)) | ||
} | ||
|
||
def apply( | ||
name: Seq[String], | ||
frequency: StatsFrequency, | ||
statsReceiver: StatsReceiver | ||
): HistogramCounter = { | ||
val stat = statsReceiver.stat(normalizeName(name) :+ frequency.suffix: _*) | ||
val histogramCounter = new HistogramCounter(stat, nowMs, frequency.frequency.inMillis) | ||
val existing = frequencyToStats(frequency).putIfAbsent(stat, histogramCounter) | ||
if (existing == null) { | ||
histogramCounter | ||
} else { | ||
existing | ||
} | ||
} | ||
|
||
override def close(deadline: Time): Future[Unit] = { | ||
closed = true | ||
Future.Done | ||
} | ||
|
||
private[this] def recordStatsForCounters( | ||
statsFrequency: StatsFrequency, | ||
statToCounter: ConcurrentHashMap[Stat, HistogramCounter] | ||
): Unit = { | ||
statToCounter.values().asScala.foreach { counter => | ||
counter.recordAndReset() | ||
} | ||
if (!closed) { | ||
timer.doLater(statsFrequency.frequency)(recordStatsForCounters(statsFrequency, statToCounter)) | ||
} | ||
} | ||
|
||
private[this] def normalizeName(name: Seq[String]): Seq[String] = { | ||
if (name.forall(!_.contains("/"))) { | ||
name | ||
} else { | ||
name.map(_.split("/")).flatten | ||
} | ||
} | ||
} | ||
|
||
private[stats] class HistogramCounter(stat: Stat, nowMs: () => Long, windowSizeMs: Long) { | ||
private[this] val counter: LongAdder = new LongAdder | ||
@volatile private[this] var lastRecordAndResetMs = nowMs() | ||
|
||
private[stats] def recordAndReset(): Unit = { | ||
val count = counter.sumThenReset() | ||
val now = nowMs() | ||
val elapsed = Math.max(0, now - lastRecordAndResetMs) | ||
val elapsedWindows = elapsed.toFloat / windowSizeMs | ||
stat.add(count / elapsedWindows) | ||
lastRecordAndResetMs = now | ||
} | ||
|
||
def incr(delta: Long): Unit = { | ||
counter.add(delta) | ||
} | ||
|
||
def incr(): Unit = { | ||
counter.increment() | ||
} | ||
} |