Skip to content

Commit

Permalink
Algorithm support for periodic sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Dec 13, 2024
1 parent a611ad2 commit 3ccb6ae
Show file tree
Hide file tree
Showing 7 changed files with 688 additions and 108 deletions.
2 changes: 2 additions & 0 deletions ntp-proto/src/algorithm/kalman/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::from_seconds(source_uncertainty),
source_delay: NtpDuration::from_seconds(0.01),
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down Expand Up @@ -227,6 +228,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::from_seconds(0.0),
source_delay: NtpDuration::from_seconds(0.0),
leap_indicator: leap,
Expand Down
49 changes: 43 additions & 6 deletions ntp-proto/src/algorithm/kalman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct SourceSnapshot<Index: Copy> {
state: KalmanState,
wander: f64,
delay: f64,
period: Option<f64>,

source_uncertainty: NtpDuration,
source_delay: NtpDuration,
Expand Down Expand Up @@ -113,7 +114,10 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
}
for (_, (state, _)) in self.sources.iter_mut() {
if let Some(ref mut snapshot) = state {
snapshot.state = snapshot.state.progress_time(time, snapshot.wander)
snapshot.state =
snapshot
.state
.progress_time(time, snapshot.wander, snapshot.period)
}
}

Expand Down Expand Up @@ -262,7 +266,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
.expect("Cannot adjust clock");
for (state, _) in self.sources.values_mut() {
if let Some(ref mut state) = state {
state.state = state.state.process_offset_steering(change);
state.state = state.state.process_offset_steering(change, state.period);
}
}
info!("Jumped offset by {}ms", change * 1e3);
Expand Down Expand Up @@ -315,10 +319,12 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
.expect("Cannot adjust clock");
for (state, _) in self.sources.values_mut() {
if let Some(ref mut state) = state {
state.state =
state
.state
.process_frequency_steering(freq_update, actual_change, state.wander)
state.state = state.state.process_frequency_steering(
freq_update,
actual_change,
state.wander,
state.period,
)
}
}
debug!(
Expand Down Expand Up @@ -382,6 +388,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
KalmanSourceController::new(
id,
self.algo_config,
None,
self.source_defaults_config,
AveragingBuffer::default(),
)
Expand All @@ -391,11 +398,13 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
&mut self,
id: SourceId,
measurement_noise_estimate: f64,
period: Option<f64>,
) -> Self::OneWaySourceController {
self.sources.insert(id, (None, false));
KalmanSourceController::new(
id,
self.algo_config,
period,
self.source_defaults_config,
measurement_noise_estimate,
)
Expand Down Expand Up @@ -638,6 +647,29 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::ZERO,
source_delay: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::NoWarning,
last_update: NtpTimestamp::from_fixed_int(0),
}),
true,
),
);

algo.sources.insert(
1,
(
Some(SourceSnapshot {
index: 0,
state: KalmanState {
state: Vector::new_vector([0.0, 0.0]),
uncertainty: Matrix::new([[1e-18, 0.0], [0.0, 1e-18]]),
time: NtpTimestamp::from_fixed_int(0),
},
wander: 0.0,
delay: 0.0,
period: Some(3.0),
source_uncertainty: NtpDuration::ZERO,
source_delay: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::NoWarning,
Expand All @@ -652,6 +684,10 @@ mod tests {
algo.sources.get(&0).unwrap().0.unwrap().state.offset(),
-100.0
);
assert_eq!(
algo.sources.get(&1).unwrap().0.unwrap().state.offset(),
-1.0
);
assert_eq!(
algo.sources.get(&0).unwrap().0.unwrap().state.time,
NtpTimestamp::from_seconds_nanos_since_ntp_era(100, 0)
Expand Down Expand Up @@ -686,6 +722,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::ZERO,
source_delay: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down
106 changes: 79 additions & 27 deletions ntp-proto/src/algorithm/kalman/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub(super) fn select<Index: Copy>(
let mut bounds: Vec<(f64, BoundType)> = Vec::with_capacity(2 * candidates.len());

for snapshot in candidates.iter() {
if snapshot.period.is_some() {
// Do not let periodic sources be part of the vote for correct time
continue;
}

let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight
+ snapshot.delay * algo_config.range_delay_weight;
if radius > algo_config.maximum_source_uncertainty
Expand All @@ -38,30 +43,44 @@ pub(super) fn select<Index: Copy>(

bounds.sort_by(|a, b| a.0.total_cmp(&b.0));

let mut max: usize = 0;
let mut maxt: f64 = 0.0;
let mut maxlow: usize = 0;
let mut maxhigh: usize = 0;
let mut maxtlow: f64 = 0.0;
let mut maxthigh: f64 = 0.0;
let mut cur: usize = 0;

for (time, boundtype) in bounds.iter() {
match boundtype {
BoundType::Start => cur += 1,
BoundType::End => cur -= 1,
}
if cur > max {
max = cur;
maxt = *time;
BoundType::Start => {
cur += 1;
if cur > maxlow {
maxlow = cur;
maxtlow = *time;
}
}
BoundType::End => {
if cur > maxhigh {
maxhigh = cur;
maxthigh = *time;
}
cur -= 1;
}
}
}

// Catch programming errors. If this ever fails there is high risk of missteering, better fail hard in that case
assert_eq!(maxlow, maxhigh);
let max = maxlow;

if max >= synchronization_config.minimum_agreeing_sources && max * 4 > bounds.len() {
candidates
.iter()
.filter(|snapshot| {
let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight
+ snapshot.delay * algo_config.range_delay_weight;
radius <= algo_config.maximum_source_uncertainty
&& snapshot.offset() - radius <= maxt
&& snapshot.offset() + radius >= maxt
&& snapshot.offset() - radius <= maxthigh
&& snapshot.offset() + radius >= maxtlow
&& snapshot.leap_indicator.is_synchronized()
})
.cloned()
Expand All @@ -86,7 +105,12 @@ mod tests {

use super::*;

fn snapshot_for_range(center: f64, uncertainty: f64, delay: f64) -> SourceSnapshot<usize> {
fn snapshot_for_range(
center: f64,
uncertainty: f64,
delay: f64,
period: Option<f64>,
) -> SourceSnapshot<usize> {
SourceSnapshot {
index: 0,
state: KalmanState {
Expand All @@ -96,6 +120,7 @@ mod tests {
},
wander: 0.0,
delay,
period,
source_uncertainty: NtpDuration::from_seconds(0.01),
source_delay: NtpDuration::from_seconds(0.01),
leap_indicator: NtpLeapIndicator::NoWarning,
Expand All @@ -108,10 +133,10 @@ mod tests {
// Test that there only is sufficient overlap in the below set when
// both statistical and delay based errors are considered.
let candidates = vec![
snapshot_for_range(0.0, 0.01, 0.09),
snapshot_for_range(0.0, 0.09, 0.01),
snapshot_for_range(0.05, 0.01, 0.09),
snapshot_for_range(0.05, 0.09, 0.01),
snapshot_for_range(0.0, 0.01, 0.09, None),
snapshot_for_range(0.0, 0.09, 0.01, None),
snapshot_for_range(0.05, 0.01, 0.09, None),
snapshot_for_range(0.05, 0.09, 0.01, None),
];
let sysconfig = SynchronizationConfig {
minimum_agreeing_sources: 4,
Expand Down Expand Up @@ -151,9 +176,9 @@ mod tests {
fn test_rejection() {
// Test sources get properly rejected as rejection bound gets tightened.
let candidates = vec![
snapshot_for_range(0.0, 1.0, 1.0),
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.0, 0.01, 0.01),
snapshot_for_range(0.0, 1.0, 1.0, None),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.0, 0.01, 0.01, None),
];
let sysconfig = SynchronizationConfig {
minimum_agreeing_sources: 1,
Expand Down Expand Up @@ -201,11 +226,11 @@ mod tests {
fn test_min_survivors() {
// Test that minimum number of survivors is correctly tested for.
let candidates = vec![
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.5, 0.1, 0.1),
snapshot_for_range(0.5, 0.1, 0.1),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.5, 0.1, 0.1, None),
snapshot_for_range(0.5, 0.1, 0.1, None),
];
let algconfig = AlgorithmConfig {
maximum_source_uncertainty: 3.0,
Expand Down Expand Up @@ -233,10 +258,10 @@ mod tests {
fn test_tie() {
// Test that in the case of a tie no group is chosen.
let candidates = vec![
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.0, 0.1, 0.1),
snapshot_for_range(0.5, 0.1, 0.1),
snapshot_for_range(0.5, 0.1, 0.1),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.0, 0.1, 0.1, None),
snapshot_for_range(0.5, 0.1, 0.1, None),
snapshot_for_range(0.5, 0.1, 0.1, None),
];
let algconfig = AlgorithmConfig {
maximum_source_uncertainty: 3.0,
Expand All @@ -251,4 +276,31 @@ mod tests {
let result = select(&sysconfig, &algconfig, candidates);
assert_eq!(result.len(), 0);
}

#[test]
fn test_periodic_is_ignored() {
let candidates = vec![
snapshot_for_range(0.0, 0.01, 0.01, None),
snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)),
snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)),
snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)),
snapshot_for_range(0.5, 0.01, 0.01, None),
snapshot_for_range(0.5, 0.01, 0.01, None),
snapshot_for_range(0.5, 0.01, 0.01, Some(1.0)),
];
let algconfig = AlgorithmConfig::default();
let sysconfig = SynchronizationConfig {
minimum_agreeing_sources: 2,
..Default::default()
};
let result = select(&sysconfig, &algconfig, candidates.clone());
assert_eq!(result.len(), 3);
assert_eq!(result[0].offset(), 0.5);
let sysconfig = SynchronizationConfig {
minimum_agreeing_sources: 3,
..Default::default()
};
let result = select(&sysconfig, &algconfig, candidates);
assert_eq!(result.len(), 0);
}
}
Loading

0 comments on commit 3ccb6ae

Please sign in to comment.