diff --git a/relay-common/src/time.rs b/relay-common/src/time.rs index 016a33caaf..3a170fd007 100644 --- a/relay-common/src/time.rs +++ b/relay-common/src/time.rs @@ -61,20 +61,6 @@ pub fn chrono_to_positive_millis(duration: chrono::Duration) -> f64 { duration_to_millis(duration.to_std().unwrap_or_default()) } -/// The conversion result of [`UnixTimestamp::to_instant`]. -/// -/// If the time is outside of what can be represented in an [`Instant`], this is `Past` or -/// `Future`. -#[derive(Clone, Copy, Debug)] -pub enum MonotonicResult { - /// A time before the earliest representable `Instant`. - Past, - /// A representable `Instant`. - Instant(Instant), - /// A time after the latest representable `Instant`. - Future, -} - /// A unix timestamp (full seconds elapsed since 1970-01-01 00:00 UTC). #[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct UnixTimestamp(u64); @@ -133,41 +119,6 @@ impl UnixTimestamp { NaiveDateTime::from_timestamp_opt(self.0 as i64, 0) .map(|n| DateTime::from_naive_utc_and_offset(n, Utc)) } - - /// Converts the UNIX timestamp into an `Instant` based on the current system timestamp. - /// - /// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns - /// [`MonotonicResult::Past`] or [`MonotonicResult::Future`]. - /// - /// Note that the system time is subject to skew, so subsequent calls to `to_instant` may return - /// different values. - /// - /// # Example - /// - /// ``` - /// use std::time::{Duration, Instant}; - /// use relay_common::time::{MonotonicResult, UnixTimestamp}; - /// - /// let timestamp = UnixTimestamp::now(); - /// if let MonotonicResult::Instant(instant) = timestamp.to_instant() { - /// assert!((Instant::now() - instant) < Duration::from_millis(1)); - /// } - /// ``` - pub fn to_instant(self) -> MonotonicResult { - let now = Self::now(); - - if self > now { - match Instant::now().checked_add(self - now) { - Some(instant) => MonotonicResult::Instant(instant), - None => MonotonicResult::Future, - } - } else { - match Instant::now().checked_sub(now - self) { - Some(instant) => MonotonicResult::Instant(instant), - None => MonotonicResult::Past, - } - } - } } impl fmt::Debug for UnixTimestamp { @@ -182,6 +133,15 @@ impl fmt::Display for UnixTimestamp { } } +/// Adds _whole_ seconds of the given duration to the timestamp. +impl std::ops::Add for UnixTimestamp { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0.saturating_add(rhs.as_secs())) + } +} + impl std::ops::Sub for UnixTimestamp { type Output = Duration; diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index bfb65d3349..9939c15324 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -10,7 +10,7 @@ use std::{fmt, mem}; use fnv::FnvHasher; use relay_base_schema::project::ProjectKey; -use relay_common::time::{MonotonicResult, UnixTimestamp}; +use relay_common::time::UnixTimestamp; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::time::Instant; @@ -117,6 +117,9 @@ pub enum ShiftKey { /// /// Only for use in processing Relays. Bucket, + + /// Do not apply shift. This should be set when `http.global_metrics` is used. + None, } /// Parameters used by the [`Aggregator`]. @@ -194,31 +197,35 @@ impl AggregatorConfig { /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`. fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant { - let now = Instant::now(); - let mut flush = None; + let initial_flush = bucket_key.timestamp + self.bucket_interval() + self.initial_delay(); - if let MonotonicResult::Instant(instant) = bucket_key.timestamp.to_instant() { - let instant = Instant::from_std(instant); - let bucket_end = instant + self.bucket_interval(); - let initial_flush = bucket_end + self.initial_delay(); - // If the initial flush is still pending, use that. - if initial_flush > now { - flush = Some(initial_flush + self.flush_time_shift(bucket_key)); - } - } + let now = UnixTimestamp::now(); + let backdated = initial_flush <= now; - let delay = UnixTimestamp::now().as_secs() as i64 - bucket_key.timestamp.as_secs() as i64; + let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64; relay_statsd::metric!( histogram(MetricHistograms::BucketsDelay) = delay as f64, - backdated = if flush.is_none() { "true" } else { "false" }, + backdated = if backdated { "true" } else { "false" }, ); - // If the initial flush time has passed or cannot be represented, debounce future flushes - // with the `debounce_delay` starting now. - match flush { - Some(initial_flush) => initial_flush, - None => now + self.debounce_delay(), - } + let flush_timestamp = if backdated { + // If the initial flush time has passed or cannot be represented, debounce future + // flushes with the `debounce_delay` starting now. However, align the current timestamp + // with the bucket interval for proper batching. + let floor = (now.as_secs() / self.bucket_interval) * self.bucket_interval; + UnixTimestamp::from_secs(floor) + self.bucket_interval() + self.debounce_delay() + } else { + // If the initial flush is still pending, use that. + initial_flush + }; + + let instant = if flush_timestamp > now { + Instant::now().checked_add(flush_timestamp - now) + } else { + Instant::now().checked_sub(now - flush_timestamp) + }; + + instant.unwrap_or_else(Instant::now) + self.flush_time_shift(bucket_key) } /// The delay to debounce backdated flushes. @@ -247,9 +254,10 @@ impl AggregatorConfig { hasher.finish() } ShiftKey::Bucket => bucket.hash64(), + ShiftKey::None => return Duration::ZERO, }; - let shift_millis = hash_value % (self.bucket_interval * 1000); + let shift_millis = hash_value % (self.bucket_interval * 1000); Duration::from_millis(shift_millis) } @@ -257,22 +265,12 @@ impl AggregatorConfig { /// /// We select the output bucket which overlaps with the center of the incoming bucket. /// Fails if timestamp is too old or too far into the future. - fn get_bucket_timestamp( - &self, - timestamp: UnixTimestamp, - bucket_width: u64, - ) -> Result { + fn get_bucket_timestamp(&self, timestamp: UnixTimestamp, bucket_width: u64) -> UnixTimestamp { // Find middle of the input bucket to select a target let ts = timestamp.as_secs().saturating_add(bucket_width / 2); // Align target_timestamp to output bucket width let ts = (ts / self.bucket_interval) * self.bucket_interval; - let output_timestamp = UnixTimestamp::from_secs(ts); - - if !self.timestamp_range().contains(&output_timestamp) { - return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); - } - - Ok(output_timestamp) + UnixTimestamp::from_secs(ts) } /// Returns the valid range for metrics timestamps. @@ -696,26 +694,27 @@ impl Aggregator { key } - // Wrapper for [`AggregatorConfig::get_bucket_timestamp`]. - // Logs a statsd metric for invalid timestamps. + /// Wrapper for [`AggregatorConfig::get_bucket_timestamp`]. + /// + /// Logs a statsd metric for invalid timestamps. fn get_bucket_timestamp( &self, timestamp: UnixTimestamp, bucket_width: u64, ) -> Result { - let res = self.config.get_bucket_timestamp(timestamp, bucket_width); - if let Err(AggregateMetricsError { - kind: AggregateMetricsErrorKind::InvalidTimestamp(ts), - }) = res - { - let delta = (ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64); + let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width); + + if !self.config.timestamp_range().contains(&bucket_ts) { + let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64); relay_statsd::metric!( histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64, aggregator = &self.name, ); + + return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); } - res + Ok(bucket_ts) } /// Merge a preaggregated bucket into this aggregator. @@ -863,7 +862,6 @@ impl fmt::Debug for Aggregator { #[cfg(test)] mod tests { - use similar_asserts::assert_eq; use super::*; @@ -880,7 +878,7 @@ mod tests { max_tag_key_length: 200, max_tag_value_length: 200, max_project_key_bucket_bytes: None, - ..Default::default() + shift_key: ShiftKey::default(), } } @@ -1181,7 +1179,7 @@ mod tests { assert_eq!(total_cost, current_cost + expected_added_cost); } - aggregator.pop_flush_buckets(false); + aggregator.pop_flush_buckets(true); assert_eq!(aggregator.cost_tracker.total_cost, 0); } @@ -1194,8 +1192,10 @@ mod tests { ..Default::default() }; + let aggregator = Aggregator::new(config); + assert!(matches!( - config + aggregator .get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2) .unwrap_err() .kind, @@ -1215,9 +1215,7 @@ mod tests { let now = UnixTimestamp::now().as_secs(); let rounded_now = UnixTimestamp::from_secs(now / 10 * 10); assert_eq!( - config - .get_bucket_timestamp(UnixTimestamp::from_secs(now), 0) - .unwrap(), + config.get_bucket_timestamp(UnixTimestamp::from_secs(now), 0), rounded_now ); } @@ -1236,7 +1234,6 @@ mod tests { assert_eq!( config .get_bucket_timestamp(UnixTimestamp::from_secs(now), 20) - .unwrap() .as_secs(), rounded_now + 10 ); @@ -1256,7 +1253,6 @@ mod tests { assert_eq!( config .get_bucket_timestamp(UnixTimestamp::from_secs(now), 23) - .unwrap() .as_secs(), rounded_now + 10 ); diff --git a/relay-metrics/src/aggregatorservice.rs b/relay-metrics/src/aggregatorservice.rs index c041a9f22a..09747eb0cb 100644 --- a/relay-metrics/src/aggregatorservice.rs +++ b/relay-metrics/src/aggregatorservice.rs @@ -392,7 +392,6 @@ impl MergeBuckets { #[cfg(test)] mod tests { - use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; @@ -498,40 +497,4 @@ mod tests { // receiver must have 1 bucket flushed assert_eq!(receiver.bucket_count(), 1); } - - #[tokio::test] - async fn test_merge_back() { - relay_test::setup(); - tokio::time::pause(); - - // Create a receiver which accepts nothing: - let receiver = TestReceiver { - reject_all: true, - ..TestReceiver::default() - }; - let recipient = receiver.clone().start().recipient(); - - let config = AggregatorServiceConfig { - bucket_interval: 1, - initial_delay: 0, - debounce_delay: 0, - ..Default::default() - }; - let aggregator = AggregatorService::new(config, Some(recipient)).start(); - - let mut bucket = some_bucket(); - bucket.timestamp = UnixTimestamp::now(); - - aggregator.send(MergeBuckets { - project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - buckets: vec![bucket], - }); - - assert_eq!(receiver.bucket_count(), 0); - - tokio::time::sleep(Duration::from_millis(1100)).await; - let bucket_count = aggregator.send(BucketCountInquiry).await.unwrap(); - assert_eq!(bucket_count, 1); - assert_eq!(receiver.bucket_count(), 0); - } } diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 0240ef646d..8597951397 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -11,7 +11,12 @@ from .test_envelope import generate_transaction_item TEST_CONFIG = { - "aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0} + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + "debounce_delay": 0, + "shift_key": "none", + } } @@ -138,6 +143,7 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_ "max_secs_in_past": forever, "max_secs_in_future": forever, "flush_partitions": metrics_partitions, + "shift_key": "none", }, } relay = relay(mini_sentry, options=relay_config)