From 88cc0fcf91b42ee4c40261ce3b54565038b6bc2d Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Wed, 11 Dec 2024 12:15:07 +0100 Subject: [PATCH] Rewired internals to work with per-source config for poll intervals. --- ntp-proto/src/algorithm/kalman/mod.rs | 39 ++++------ ntp-proto/src/algorithm/kalman/source.rs | 84 ++++++++++----------- ntp-proto/src/algorithm/mod.rs | 10 ++- ntp-proto/src/config.rs | 4 +- ntp-proto/src/lib.rs | 2 +- ntp-proto/src/source.rs | 16 ++-- ntp-proto/src/system.rs | 24 +++--- ntpd/src/daemon/config/mod.rs | 4 +- ntpd/src/daemon/ntp_source.rs | 4 +- ntpd/src/daemon/sock_source.rs | 7 +- ntpd/src/daemon/spawn/mod.rs | 6 +- ntpd/src/daemon/spawn/nts.rs | 6 +- ntpd/src/daemon/spawn/nts_pool.rs | 8 +- ntpd/src/daemon/spawn/pool.rs | 86 ++++++++++++--------- ntpd/src/daemon/spawn/sock.rs | 19 +++-- ntpd/src/daemon/spawn/standard.rs | 95 ++++++++++++++---------- ntpd/src/daemon/system.rs | 26 +++---- ntpd/src/force_sync/algorithm.rs | 17 +++-- 18 files changed, 248 insertions(+), 209 deletions(-) diff --git a/ntp-proto/src/algorithm/kalman/mod.rs b/ntp-proto/src/algorithm/kalman/mod.rs index 690fa1ef4..5ba682f3b 100644 --- a/ntp-proto/src/algorithm/kalman/mod.rs +++ b/ntp-proto/src/algorithm/kalman/mod.rs @@ -6,7 +6,7 @@ use tracing::{debug, error, info}; use crate::{ clock::NtpClock, - config::{SourceDefaultsConfig, SynchronizationConfig}, + config::{SourceConfig, SynchronizationConfig}, packet::NtpLeapIndicator, system::TimeSnapshot, time_types::{NtpDuration, NtpTimestamp}, @@ -84,7 +84,6 @@ pub struct KalmanClockController>, bool)>, clock: C, synchronization_config: SynchronizationConfig, - source_defaults_config: SourceDefaultsConfig, algo_config: AlgorithmConfig, freq_offset: f64, timedata: TimeSnapshot, @@ -352,7 +351,6 @@ impl TimeSyncC fn new( clock: C, synchronization_config: SynchronizationConfig, - source_defaults_config: SourceDefaultsConfig, algo_config: Self::AlgorithmConfig, ) -> Result { // Setup clock @@ -362,7 +360,6 @@ impl TimeSyncC sources: HashMap::new(), clock, synchronization_config, - source_defaults_config, algo_config, freq_offset, desired_freq: 0.0, @@ -377,12 +374,16 @@ impl TimeSyncC Ok(()) } - fn add_source(&mut self, id: SourceId) -> Self::NtpSourceController { + fn add_source( + &mut self, + id: SourceId, + source_config: SourceConfig, + ) -> Self::NtpSourceController { self.sources.insert(id, (None, false)); KalmanSourceController::new( id, self.algo_config, - self.source_defaults_config, + source_config, AveragingBuffer::default(), ) } @@ -390,13 +391,14 @@ impl TimeSyncC fn add_one_way_source( &mut self, id: SourceId, + source_config: SourceConfig, measurement_noise_estimate: f64, ) -> Self::OneWaySourceController { self.sources.insert(id, (None, false)); KalmanSourceController::new( id, self.algo_config, - self.source_defaults_config, + source_config, measurement_noise_estimate, ) } @@ -495,14 +497,13 @@ mod tests { ..SynchronizationConfig::default() }; let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); + let source_config = SourceConfig::default(); let mut algo = KalmanClockController::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -511,7 +512,7 @@ mod tests { // ignore startup steer of frequency. *algo.clock.has_steered.borrow_mut() = false; - let mut source = algo.add_source(0); + let mut source = algo.add_source(0, source_config); algo.source_update(0, true); assert!(algo.in_startup); @@ -563,14 +564,12 @@ mod tests { step_threshold: 1800.0, ..Default::default() }; - let source_defaults_config = SourceDefaultsConfig::default(); let mut algo = KalmanClockController::<_, u32>::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -593,14 +592,12 @@ mod tests { ..SynchronizationConfig::default() }; let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); let mut algo = KalmanClockController::<_, u32>::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -614,14 +611,12 @@ mod tests { fn test_jumps_update_state() { let synchronization_config = SynchronizationConfig::default(); let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); let mut algo = KalmanClockController::<_, u32>::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -662,14 +657,12 @@ mod tests { fn test_freqsteer_update_state() { let synchronization_config = SynchronizationConfig::default(); let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); let mut algo = KalmanClockController::<_, u32>::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -707,14 +700,13 @@ mod tests { ..SynchronizationConfig::default() }; let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); + let source_config = SourceConfig::default(); let mut algo = KalmanClockController::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -723,7 +715,7 @@ mod tests { // ignore startup steer of frequency. *algo.clock.has_steered.borrow_mut() = false; - let mut source = algo.add_source(0); + let mut source = algo.add_source(0, source_config); algo.source_update(0, true); let mut noise = 1e-9; @@ -766,14 +758,13 @@ mod tests { ..SynchronizationConfig::default() }; let algo_config = AlgorithmConfig::default(); - let source_defaults_config = SourceDefaultsConfig::default(); + let source_config = SourceConfig::default(); let mut algo = KalmanClockController::new( TestClock { has_steered: RefCell::new(false), current_time: NtpTimestamp::from_fixed_int(0), }, synchronization_config, - source_defaults_config, algo_config, ) .unwrap(); @@ -782,7 +773,7 @@ mod tests { // ignore startup steer of frequency. *algo.clock.has_steered.borrow_mut() = false; - let mut source = algo.add_source(0); + let mut source = algo.add_source(0, source_config); algo.source_update(0, true); let mut noise = 1e-9; diff --git a/ntp-proto/src/algorithm/kalman/source.rs b/ntp-proto/src/algorithm/kalman/source.rs index f93c28358..8eda4ca60 100644 --- a/ntp-proto/src/algorithm/kalman/source.rs +++ b/ntp-proto/src/algorithm/kalman/source.rs @@ -77,7 +77,7 @@ use tracing::{debug, trace}; use crate::{ algorithm::{KalmanControllerMessage, KalmanSourceMessage, SourceController}, - config::SourceDefaultsConfig, + config::SourceConfig, source::Measurement, time_types::{NtpDuration, NtpTimestamp, PollInterval, PollIntervalLimits}, ObservableSourceTimedata, @@ -447,7 +447,7 @@ impl /// not so much that each individual poll message gives us very little new information. fn update_desired_poll( &mut self, - source_defaults_config: &SourceDefaultsConfig, + source_config: &SourceConfig, algo_config: &AlgorithmConfig, p: f64, weight: f64, @@ -468,18 +468,18 @@ impl } trace!(poll_score = self.poll_score, ?weight, "Poll desire update"); if p <= algo_config.poll_interval_step_threshold { - self.desired_poll_interval = source_defaults_config.poll_interval_limits.min; + self.desired_poll_interval = source_config.poll_interval_limits.min; self.poll_score = 0; } else if self.poll_score <= -algo_config.poll_interval_hysteresis { self.desired_poll_interval = self .desired_poll_interval - .inc(source_defaults_config.poll_interval_limits); + .inc(source_config.poll_interval_limits); self.poll_score = 0; debug!(interval = ?self.desired_poll_interval, "Increased poll interval"); } else if self.poll_score >= algo_config.poll_interval_hysteresis { self.desired_poll_interval = self .desired_poll_interval - .dec(source_defaults_config.poll_interval_limits); + .dec(source_config.poll_interval_limits); self.poll_score = 0; debug!(interval = ?self.desired_poll_interval, "Decreased poll interval"); } @@ -525,7 +525,7 @@ impl /// Update our estimates based on a new measurement. fn update( &mut self, - source_defaults_config: &SourceDefaultsConfig, + source_config: &SourceConfig, algo_config: &AlgorithmConfig, measurement: Measurement, ) -> bool { @@ -561,13 +561,7 @@ impl let (p, weight, measurement_period) = self.absorb_measurement(measurement); self.update_wander_estimate(algo_config, p, weight); - self.update_desired_poll( - source_defaults_config, - algo_config, - p, - weight, - measurement_period, - ); + self.update_desired_poll(source_config, algo_config, p, weight, measurement_period); debug!( "source offset {}±{}ms, freq {}±{}ppm", @@ -632,7 +626,7 @@ impl // Returns whether the clock may need adjusting. pub fn update_self_using_measurement( &mut self, - source_defaults_config: &SourceDefaultsConfig, + source_config: &SourceConfig, algo_config: &AlgorithmConfig, mut measurement: Measurement, ) -> bool { @@ -643,12 +637,12 @@ impl }; measurement.delay = noise_estimator.preprocess(measurement.delay); - self.update_self_using_raw_measurement(source_defaults_config, algo_config, measurement) + self.update_self_using_raw_measurement(source_config, algo_config, measurement) } fn update_self_using_raw_measurement( &mut self, - source_defaults_config: &SourceDefaultsConfig, + source_config: &SourceConfig, algo_config: &AlgorithmConfig, measurement: Measurement, ) -> bool { @@ -669,7 +663,7 @@ impl noise_estimator: filter.noise_estimator.clone(), precision_score: 0, poll_score: 0, - desired_poll_interval: source_defaults_config.initial_poll_interval, + desired_poll_interval: source_config.initial_poll_interval, last_measurement: measurement, prev_was_outlier: false, last_iter: measurement.localtime, @@ -704,7 +698,7 @@ impl false } else { - filter.update(source_defaults_config, algo_config, measurement) + filter.update(source_config, algo_config, measurement) } } } @@ -793,7 +787,7 @@ pub struct KalmanSourceController< index: SourceId, state: SourceState, algo_config: AlgorithmConfig, - source_defaults_config: SourceDefaultsConfig, + source_config: SourceConfig, } pub type TwoWayKalmanSourceController = @@ -810,14 +804,14 @@ impl< pub(super) fn new( index: SourceId, algo_config: AlgorithmConfig, - source_defaults_config: SourceDefaultsConfig, + source_config: SourceConfig, noise_estimator: N, ) -> Self { KalmanSourceController { index, state: SourceState::new(noise_estimator), algo_config, - source_defaults_config, + source_config, } } } @@ -848,7 +842,7 @@ impl< measurement: Measurement, ) -> Option { if self.state.update_self_using_measurement( - &self.source_defaults_config, + &self.source_config, &self.algo_config, measurement, ) { @@ -862,7 +856,7 @@ impl< fn desired_poll_interval(&self) -> PollInterval { self.state - .get_desired_poll(&self.source_defaults_config.poll_interval_limits) + .get_desired_poll(&self.source_config.poll_interval_limits) } fn observe(&self) -> super::super::ObservableSourceTimedata { @@ -921,7 +915,7 @@ mod tests { last_iter: base, })); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -969,7 +963,7 @@ mod tests { })); source.process_offset_steering(-1800.0); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1017,7 +1011,7 @@ mod tests { })); source.process_offset_steering(1800.0); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1121,7 +1115,7 @@ mod tests { ); source.update_self_using_raw_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1197,7 +1191,7 @@ mod tests { ); source.update_self_using_raw_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1380,7 +1374,7 @@ mod tests { .snapshot(0_usize, &AlgorithmConfig::default()) .is_none()); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1404,7 +1398,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1428,7 +1422,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1452,7 +1446,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1476,7 +1470,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1500,7 +1494,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1524,7 +1518,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1548,7 +1542,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay, @@ -1609,7 +1603,7 @@ mod tests { .snapshot(0_usize, &AlgorithmConfig::default()) .is_none()); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1633,7 +1627,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1657,7 +1651,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1681,7 +1675,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1706,7 +1700,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1730,7 +1724,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1754,7 +1748,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1778,7 +1772,7 @@ mod tests { > 1.0 ); source.update_self_using_measurement( - &SourceDefaultsConfig::default(), + &SourceConfig::default(), &AlgorithmConfig::default(), Measurement { delay: NtpDuration::from_seconds(0.0), @@ -1816,7 +1810,7 @@ mod tests { #[test] fn test_poll_duration_variation() { - let config = SourceDefaultsConfig::default(); + let config = SourceConfig::default(); let algo_config = AlgorithmConfig { poll_interval_hysteresis: 2, ..Default::default() diff --git a/ntp-proto/src/algorithm/mod.rs b/ntp-proto/src/algorithm/mod.rs index 0af89e098..aacd51d6d 100644 --- a/ntp-proto/src/algorithm/mod.rs +++ b/ntp-proto/src/algorithm/mod.rs @@ -4,7 +4,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ clock::NtpClock, - config::{SourceDefaultsConfig, SynchronizationConfig}, + config::{SourceConfig, SynchronizationConfig}, source::Measurement, system::TimeSnapshot, time_types::{NtpDuration, NtpTimestamp}, @@ -70,7 +70,6 @@ pub trait TimeSyncController: Sized + Send + 'static { fn new( clock: Self::Clock, synchronization_config: SynchronizationConfig, - source_defaults_config: SourceDefaultsConfig, algorithm_config: Self::AlgorithmConfig, ) -> Result::Error>; @@ -78,11 +77,16 @@ pub trait TimeSyncController: Sized + Send + 'static { fn take_control(&mut self) -> Result<(), ::Error>; /// Create a new source with given identity - fn add_source(&mut self, id: Self::SourceId) -> Self::NtpSourceController; + fn add_source( + &mut self, + id: Self::SourceId, + source_config: SourceConfig, + ) -> Self::NtpSourceController; /// Create a new one way source with given identity (used e.g. with GPS sock sources) fn add_one_way_source( &mut self, id: Self::SourceId, + source_config: SourceConfig, measurement_noise_estimate: f64, ) -> Self::OneWaySourceController; /// Notify the controller that a previous source has gone diff --git a/ntp-proto/src/config.rs b/ntp-proto/src/config.rs index 0004f2b6e..6d6aa5ee5 100644 --- a/ntp-proto/src/config.rs +++ b/ntp-proto/src/config.rs @@ -197,7 +197,7 @@ impl<'de> Deserialize<'de> for StepThreshold { #[derive(Deserialize, Debug, Clone, Copy)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] -pub struct SourceDefaultsConfig { +pub struct SourceConfig { /// Minima and maxima for the poll interval of clients #[serde(default)] pub poll_interval_limits: PollIntervalLimits, @@ -207,7 +207,7 @@ pub struct SourceDefaultsConfig { pub initial_poll_interval: PollInterval, } -impl Default for SourceDefaultsConfig { +impl Default for SourceConfig { fn default() -> Self { Self { poll_interval_limits: Default::default(), diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index ec06cf9c3..fc232b912 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -43,7 +43,7 @@ mod exports { TimeSyncController, TwoWayKalmanSourceController, }; pub use super::clock::NtpClock; - pub use super::config::{SourceDefaultsConfig, StepThreshold, SynchronizationConfig}; + pub use super::config::{SourceConfig, StepThreshold, SynchronizationConfig}; pub use super::identifiers::ReferenceId; #[cfg(feature = "__internal-fuzz")] pub use super::ipfilter::fuzz::fuzz_ipfilter; diff --git a/ntp-proto/src/source.rs b/ntp-proto/src/source.rs index c27a8cade..f2e89321f 100644 --- a/ntp-proto/src/source.rs +++ b/ntp-proto/src/source.rs @@ -5,7 +5,7 @@ use crate::packet::{ }; use crate::{ algorithm::{ObservableSourceTimedata, SourceController}, - config::SourceDefaultsConfig, + config::SourceConfig, cookiestash::CookieStash, identifiers::ReferenceId, packet::{Cipher, NtpAssociationMode, NtpLeapIndicator, NtpPacket, RequestIdentifier}, @@ -81,7 +81,7 @@ pub struct NtpSource { impl> NtpSource { pub(crate) fn new( source_addr: SocketAddr, - source_defaults_config: SourceDefaultsConfig, + source_config: SourceConfig, protocol_version: ProtocolVersion, controller: Controller, nts: Option>, @@ -479,8 +479,8 @@ impl> NtpSource> NtpSource> NtpSource> NtpSource { synchronization_config: SynchronizationConfig, - source_defaults_config: SourceDefaultsConfig, system: SystemSnapshot, ip_list: Arc<[IpAddr]>, @@ -201,7 +200,6 @@ impl, ) -> Result::Error> { @@ -218,16 +216,10 @@ impl Result< OneWaySource, ::Error, > { self.ensure_controller_control()?; - let controller = self - .controller - .add_one_way_source(id, measurement_noise_estimate); + let controller = + self.controller + .add_one_way_source(id, source_config, measurement_noise_estimate); self.sources.insert(id, None); Ok(OneWaySource::new(controller)) } @@ -268,6 +261,7 @@ impl>, @@ -279,11 +273,11 @@ impl::Error, > { self.ensure_controller_control()?; - let controller = self.controller.add_source(id); + let controller = self.controller.add_source(id, source_config); self.sources.insert(id, None); Ok(NtpSource::new( source_addr, - self.source_defaults_config, + source_config, protocol_version, controller, nts, diff --git a/ntpd/src/daemon/config/mod.rs b/ntpd/src/daemon/config/mod.rs index 1b2d3e0a1..fef163ea3 100644 --- a/ntpd/src/daemon/config/mod.rs +++ b/ntpd/src/daemon/config/mod.rs @@ -5,7 +5,7 @@ pub mod subnet; use clock_steering::unix::UnixClock; #[cfg(feature = "unstable_ntpv5")] use ntp_proto::NtpVersion; -use ntp_proto::{AlgorithmConfig, SourceDefaultsConfig, SynchronizationConfig}; +use ntp_proto::{AlgorithmConfig, SourceConfig, SynchronizationConfig}; pub use ntp_source::*; use serde::{Deserialize, Deserializer}; pub use server::*; @@ -362,7 +362,7 @@ pub struct Config { #[serde(default)] pub synchronization: DaemonSynchronizationConfig, #[serde(default)] - pub source_defaults: SourceDefaultsConfig, + pub source_defaults: SourceConfig, #[serde(default)] pub observability: ObservabilityConfig, #[serde(default)] diff --git a/ntpd/src/daemon/ntp_source.rs b/ntpd/src/daemon/ntp_source.rs index 26d1071ad..efe6a049c 100644 --- a/ntpd/src/daemon/ntp_source.rs +++ b/ntpd/src/daemon/ntp_source.rs @@ -452,7 +452,7 @@ mod tests { use ntp_proto::{ AlgorithmConfig, KalmanClockController, KalmanControllerMessage, KalmanSourceMessage, - NoCipher, NtpDuration, NtpLeapIndicator, NtpPacket, ProtocolVersion, SourceDefaultsConfig, + NoCipher, NtpDuration, NtpLeapIndicator, NtpPacket, ProtocolVersion, SourceConfig, SynchronizationConfig, SystemSnapshot, TimeSnapshot, TwoWayKalmanSourceController, }; use timestamped_socket::socket::{open_ip, GeneralTimestampMode, Open}; @@ -602,7 +602,6 @@ mod tests { let mut system: ntp_proto::System<_, KalmanClockController<_, _>> = ntp_proto::System::new( TestClock {}, SynchronizationConfig::default(), - SourceDefaultsConfig::default(), AlgorithmConfig::default(), Arc::new([]), ) @@ -610,6 +609,7 @@ mod tests { let Ok((source, _)) = system.create_ntp_source( index, + SourceConfig::default(), SocketAddr::from((Ipv4Addr::LOCALHOST, port_base)), ProtocolVersion::default(), None, diff --git a/ntpd/src/daemon/sock_source.rs b/ntpd/src/daemon/sock_source.rs index 02bf05569..1bbff469c 100644 --- a/ntpd/src/daemon/sock_source.rs +++ b/ntpd/src/daemon/sock_source.rs @@ -228,7 +228,7 @@ mod tests { use ntp_proto::{ AlgorithmConfig, KalmanClockController, NtpClock, NtpDuration, NtpLeapIndicator, - NtpTimestamp, ReferenceId, SourceDefaultsConfig, SynchronizationConfig, + NtpTimestamp, ReferenceId, SourceConfig, SynchronizationConfig, }; use tokio::sync::mpsc; @@ -299,7 +299,6 @@ mod tests { let mut system: ntp_proto::System<_, KalmanClockController<_, _>> = ntp_proto::System::new( clock.clone(), SynchronizationConfig::default(), - SourceDefaultsConfig::default(), AlgorithmConfig::default(), Arc::new([]), ) @@ -317,7 +316,9 @@ mod tests { system_update_receiver, source_snapshots: Arc::new(RwLock::new(HashMap::new())), }, - system.create_sock_source(index, 0.001).unwrap(), + system + .create_sock_source(index, SourceConfig::default(), 0.001) + .unwrap(), ); // Send example data to socket diff --git a/ntpd/src/daemon/spawn/mod.rs b/ntpd/src/daemon/spawn/mod.rs index e3f556a30..f4c901180 100644 --- a/ntpd/src/daemon/spawn/mod.rs +++ b/ntpd/src/daemon/spawn/mod.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, sync::atomic::AtomicU64}; -use ntp_proto::{ProtocolVersion, SourceNtsData}; +use ntp_proto::{ProtocolVersion, SourceConfig, SourceNtsData}; use serde::{Deserialize, Serialize}; use tokio::{ sync::mpsc, @@ -118,6 +118,7 @@ impl SpawnAction { addr: SocketAddr, normalized_addr: NormalizedAddress, protocol_version: ProtocolVersion, + config: SourceConfig, nts: Option>, ) -> SpawnAction { SpawnAction::Create(SourceCreateParameters::Ntp(NtpSourceCreateParameters { @@ -125,6 +126,7 @@ impl SpawnAction { addr, normalized_addr, protocol_version, + config, nts, })) } @@ -158,6 +160,7 @@ pub struct NtpSourceCreateParameters { pub addr: SocketAddr, pub normalized_addr: NormalizedAddress, pub protocol_version: ProtocolVersion, + pub config: SourceConfig, pub nts: Option>, } @@ -165,6 +168,7 @@ pub struct NtpSourceCreateParameters { pub struct SockSourceCreateParameters { pub id: SourceId, pub path: String, + pub config: SourceConfig, pub noise_estimate: f64, } diff --git a/ntpd/src/daemon/spawn/nts.rs b/ntpd/src/daemon/spawn/nts.rs index 07b59cb25..026d36ad3 100644 --- a/ntpd/src/daemon/spawn/nts.rs +++ b/ntpd/src/daemon/spawn/nts.rs @@ -2,6 +2,7 @@ use std::fmt::Display; use std::net::SocketAddr; use std::ops::Deref; +use ntp_proto::SourceConfig; use tokio::sync::mpsc; use tracing::warn; @@ -11,6 +12,7 @@ use super::{SourceId, SourceRemovedEvent, SpawnAction, SpawnEvent, Spawner, Spaw pub struct NtsSpawner { config: NtsSourceConfig, + source_config: SourceConfig, id: SpawnerId, has_spawned: bool, } @@ -53,9 +55,10 @@ pub(super) async fn resolve_addr(address: (&str, u16)) -> Option { } impl NtsSpawner { - pub fn new(config: NtsSourceConfig) -> NtsSpawner { + pub fn new(config: NtsSourceConfig, source_config: SourceConfig) -> NtsSpawner { NtsSpawner { config, + source_config, id: Default::default(), has_spawned: false, } @@ -91,6 +94,7 @@ impl Spawner for NtsSpawner { address, self.config.address.deref().clone(), ke.protocol_version, + self.source_config, Some(ke.nts), ), )) diff --git a/ntpd/src/daemon/spawn/nts_pool.rs b/ntpd/src/daemon/spawn/nts_pool.rs index 7d2615197..edf1f0046 100644 --- a/ntpd/src/daemon/spawn/nts_pool.rs +++ b/ntpd/src/daemon/spawn/nts_pool.rs @@ -4,6 +4,8 @@ use std::ops::Deref; use tokio::sync::mpsc; use tracing::warn; +use ntp_proto::SourceConfig; + use super::super::{ config::NtsPoolSourceConfig, keyexchange::key_exchange_client_with_denied_servers, }; @@ -19,6 +21,7 @@ struct PoolSource { pub struct NtsPoolSpawner { config: NtsPoolSourceConfig, + source_config: SourceConfig, id: SpawnerId, current_sources: Vec, } @@ -45,12 +48,12 @@ impl From> for NtsPoolSpawnError { } impl NtsPoolSpawner { - pub fn new(config: NtsPoolSourceConfig) -> NtsPoolSpawner { + pub fn new(config: NtsPoolSourceConfig, source_config: SourceConfig) -> NtsPoolSpawner { NtsPoolSpawner { config, + source_config, id: Default::default(), current_sources: Default::default(), - //known_ips: Default::default(), } } @@ -99,6 +102,7 @@ impl Spawner for NtsPoolSpawner { address, self.config.addr.deref().clone(), ke.protocol_version, + self.source_config, Some(ke.nts), ), )) diff --git a/ntpd/src/daemon/spawn/pool.rs b/ntpd/src/daemon/spawn/pool.rs index 50077da09..19f961fbd 100644 --- a/ntpd/src/daemon/spawn/pool.rs +++ b/ntpd/src/daemon/spawn/pool.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, ops::Deref}; #[cfg(feature = "unstable_ntpv5")] use ntp_proto::NtpVersion; -use ntp_proto::ProtocolVersion; +use ntp_proto::{ProtocolVersion, SourceConfig}; use tokio::sync::mpsc; use tracing::warn; @@ -18,6 +18,7 @@ struct PoolSource { pub struct PoolSpawner { config: PoolSourceConfig, + source_config: SourceConfig, id: SpawnerId, current_sources: Vec, known_ips: Vec, @@ -35,9 +36,10 @@ impl Display for PoolSpawnError { impl std::error::Error for PoolSpawnError {} impl PoolSpawner { - pub fn new(config: PoolSourceConfig) -> PoolSpawner { + pub fn new(config: PoolSourceConfig, source_config: SourceConfig) -> PoolSpawner { PoolSpawner { config, + source_config, id: Default::default(), current_sources: Default::default(), known_ips: Default::default(), @@ -93,6 +95,7 @@ impl Spawner for PoolSpawner { Some(NtpVersion::V5) => ProtocolVersion::V5, None => ProtocolVersion::default(), }, + self.source_config, None, ); tracing::debug!(?action, "intending to spawn new pool source at"); @@ -139,6 +142,7 @@ mod tests { #[cfg(feature = "unstable_ntpv5")] use ntp_proto::ProtocolVersion; + use ntp_proto::SourceConfig; use tokio::sync::mpsc::{self, error::TryRecvError}; use crate::daemon::{ @@ -155,14 +159,17 @@ mod tests { let address_strings = ["127.0.0.1:123", "127.0.0.2:123", "127.0.0.3:123"]; let addresses = address_strings.map(|addr| addr.parse().unwrap()); - let mut pool = PoolSpawner::new(PoolSourceConfig { - addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) - .into(), - count: 2, - ignore: vec![], - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut pool = PoolSpawner::new( + PoolSourceConfig { + addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) + .into(), + count: 2, + ignore: vec![], + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let spawner_id = pool.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -210,7 +217,7 @@ mod tests { ignore: vec![], #[cfg(feature = "unstable_ntpv5")] ntp_version: Some(ntp_proto::NtpVersion::V5), - }); + }, SourceConfig::default()); let spawner_id = pool.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -252,7 +259,7 @@ mod tests { ignore: vec![], #[cfg(feature = "unstable_ntpv5")] ntp_version: Some(ntp_proto::NtpVersion::V4), - }); + }, SourceConfig::default()); let spawner_id = pool.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -287,14 +294,17 @@ mod tests { let addresses = address_strings.map(|addr| addr.parse().unwrap()); let ignores = vec!["127.0.0.1".parse().unwrap()]; - let mut pool = PoolSpawner::new(PoolSourceConfig { - addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) - .into(), - count: 2, - ignore: ignores.clone(), - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut pool = PoolSpawner::new( + PoolSourceConfig { + addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) + .into(), + count: 2, + ignore: ignores.clone(), + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let spawner_id = pool.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -326,14 +336,17 @@ mod tests { let address_strings = ["127.0.0.1:123", "127.0.0.2:123", "127.0.0.3:123"]; let addresses = address_strings.map(|addr| addr.parse().unwrap()); - let mut pool = PoolSpawner::new(PoolSourceConfig { - addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) - .into(), - count: 2, - ignore: vec![], - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut pool = PoolSpawner::new( + PoolSourceConfig { + addr: NormalizedAddress::with_hardcoded_dns("example.com", 123, addresses.to_vec()) + .into(), + count: 2, + ignore: vec![], + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); assert!(!pool.is_complete()); @@ -370,13 +383,16 @@ mod tests { #[tokio::test] async fn works_if_address_does_not_resolve() { - let mut pool = PoolSpawner::new(PoolSourceConfig { - addr: NormalizedAddress::with_hardcoded_dns("does.not.resolve", 123, vec![]).into(), - count: 2, - ignore: vec![], - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut pool = PoolSpawner::new( + PoolSourceConfig { + addr: NormalizedAddress::with_hardcoded_dns("does.not.resolve", 123, vec![]).into(), + count: 2, + ignore: vec![], + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); assert!(!pool.is_complete()); pool.try_spawn(&action_tx).await.unwrap(); diff --git a/ntpd/src/daemon/spawn/sock.rs b/ntpd/src/daemon/spawn/sock.rs index dfc074eac..f5ef9f6d2 100644 --- a/ntpd/src/daemon/spawn/sock.rs +++ b/ntpd/src/daemon/spawn/sock.rs @@ -1,3 +1,4 @@ +use ntp_proto::SourceConfig; use tokio::sync::mpsc; use crate::daemon::config::SockSourceConfig; @@ -9,14 +10,16 @@ use super::{ pub struct SockSpawner { config: SockSourceConfig, + source_config: SourceConfig, id: SpawnerId, has_spawned: bool, } impl SockSpawner { - pub fn new(config: SockSourceConfig) -> SockSpawner { + pub fn new(config: SockSourceConfig, source_config: SourceConfig) -> SockSpawner { SockSpawner { config, + source_config, id: Default::default(), has_spawned: false, } @@ -37,6 +40,7 @@ impl Spawner for SockSpawner { SpawnAction::Create(SourceCreateParameters::Sock(SockSourceCreateParameters { id: SourceId::new(), path: self.config.path.clone(), + config: self.source_config, noise_estimate: self.config.measurement_noise_estimate.to_seconds(), })), )) @@ -74,7 +78,7 @@ impl Spawner for SockSpawner { #[cfg(test)] mod tests { - use ntp_proto::NtpDuration; + use ntp_proto::{NtpDuration, SourceConfig}; use tokio::sync::mpsc; use crate::daemon::{ @@ -87,10 +91,13 @@ mod tests { async fn creates_a_source() { let socket_path = "/tmp/test.sock"; let noise_estimate = 1e-6; - let mut spawner = SockSpawner::new(SockSourceConfig { - path: socket_path.to_string(), - measurement_noise_estimate: NtpDuration::from_seconds(noise_estimate), - }); + let mut spawner = SockSpawner::new( + SockSourceConfig { + path: socket_path.to_string(), + measurement_noise_estimate: NtpDuration::from_seconds(noise_estimate), + }, + SourceConfig::default(), + ); let spawner_id = spawner.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); diff --git a/ntpd/src/daemon/spawn/standard.rs b/ntpd/src/daemon/spawn/standard.rs index 464cf355a..6b850a612 100644 --- a/ntpd/src/daemon/spawn/standard.rs +++ b/ntpd/src/daemon/spawn/standard.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, ops::Deref}; #[cfg(feature = "unstable_ntpv5")] use ntp_proto::NtpVersion; -use ntp_proto::ProtocolVersion; +use ntp_proto::{ProtocolVersion, SourceConfig}; use tokio::sync::mpsc; use tracing::warn; @@ -16,6 +16,7 @@ use super::{ pub struct StandardSpawner { id: SpawnerId, config: StandardSource, + source_config: SourceConfig, resolved: Option, has_spawned: bool, } @@ -42,10 +43,11 @@ impl From> for StandardSpawnError { impl std::error::Error for StandardSpawnError {} impl StandardSpawner { - pub fn new(config: StandardSource) -> StandardSpawner { + pub fn new(config: StandardSource, source_config: SourceConfig) -> StandardSpawner { StandardSpawner { id: Default::default(), config, + source_config, resolved: None, has_spawned: false, } @@ -101,6 +103,7 @@ impl Spawner for StandardSpawner { Some(NtpVersion::V5) => ProtocolVersion::V5, None => ProtocolVersion::default(), }, + self.source_config, None, ), )) @@ -145,6 +148,7 @@ mod tests { #[cfg(feature = "unstable_ntpv5")] use ntp_proto::ProtocolVersion; + use ntp_proto::SourceConfig; use tokio::sync::mpsc::{self, error::TryRecvError}; use crate::daemon::{ @@ -158,16 +162,19 @@ mod tests { #[tokio::test] async fn creates_a_source() { - let mut spawner = StandardSpawner::new(StandardSource { - address: NormalizedAddress::with_hardcoded_dns( - "example.com", - 123, - vec!["127.0.0.1:123".parse().unwrap()], - ) - .into(), - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut spawner = StandardSpawner::new( + StandardSource { + address: NormalizedAddress::with_hardcoded_dns( + "example.com", + 123, + vec!["127.0.0.1:123".parse().unwrap()], + ) + .into(), + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let spawner_id = spawner.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -200,7 +207,7 @@ mod tests { ) .into(), ntp_version: Some(ntp_proto::NtpVersion::V5), - }); + }, SourceConfig::default()); let spawner_id = spawner.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -227,7 +234,7 @@ mod tests { ) .into(), ntp_version: Some(ntp_proto::NtpVersion::V4), - }); + }, SourceConfig::default()); let spawner_id = spawner.get_id(); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); @@ -245,16 +252,19 @@ mod tests { #[tokio::test] async fn recreates_a_source() { - let mut spawner = StandardSpawner::new(StandardSource { - address: NormalizedAddress::with_hardcoded_dns( - "example.com", - 123, - vec!["127.0.0.1:123".parse().unwrap()], - ) - .into(), - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut spawner = StandardSpawner::new( + StandardSource { + address: NormalizedAddress::with_hardcoded_dns( + "example.com", + 123, + vec!["127.0.0.1:123".parse().unwrap()], + ) + .into(), + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); assert!(!spawner.is_complete()); @@ -284,16 +294,19 @@ mod tests { let address_strings = ["127.0.0.1:123", "127.0.0.2:123", "127.0.0.3:123"]; let addresses = address_strings.map(|addr| addr.parse().unwrap()); - let mut spawner = StandardSpawner::new(StandardSource { - address: NormalizedAddress::with_hardcoded_dns( - "europe.pool.ntp.org", - 123, - addresses.to_vec(), - ) - .into(), - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut spawner = StandardSpawner::new( + StandardSource { + address: NormalizedAddress::with_hardcoded_dns( + "europe.pool.ntp.org", + 123, + addresses.to_vec(), + ) + .into(), + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); assert!(!spawner.is_complete()); @@ -343,11 +356,15 @@ mod tests { #[tokio::test] async fn works_if_address_does_not_resolve() { - let mut spawner = StandardSpawner::new(StandardSource { - address: NormalizedAddress::with_hardcoded_dns("does.not.resolve", 123, vec![]).into(), - #[cfg(feature = "unstable_ntpv5")] - ntp_version: None, - }); + let mut spawner = StandardSpawner::new( + StandardSource { + address: NormalizedAddress::with_hardcoded_dns("does.not.resolve", 123, vec![]) + .into(), + #[cfg(feature = "unstable_ntpv5")] + ntp_version: None, + }, + SourceConfig::default(), + ); let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); spawner.try_spawn(&action_tx).await.unwrap(); diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index 56e0fdf8a..ddc68ef31 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -26,7 +26,7 @@ use std::{ }; use ntp_proto::{ - KeySet, NtpClock, ObservableSourceState, SourceDefaultsConfig, SynchronizationConfig, System, + KeySet, NtpClock, ObservableSourceState, SourceConfig, SynchronizationConfig, System, SystemActionIterator, SystemSnapshot, SystemSourceUpdate, TimeSyncController, }; use timestamped_socket::interface::InterfaceName; @@ -91,7 +91,7 @@ pub struct DaemonChannels { pub async fn spawn>( synchronization_config: SynchronizationConfig, algorithm_config: Controller::AlgorithmConfig, - source_defaults_config: SourceDefaultsConfig, + source_defaults_config: SourceConfig, clock_config: ClockConfig, source_configs: &[NtpSourceConfig], server_configs: &[ServerConfig], @@ -105,7 +105,6 @@ pub async fn spawn { system - .add_spawner(StandardSpawner::new(cfg.clone())) + .add_spawner(StandardSpawner::new(cfg.clone(), source_defaults_config)) .map_err(|e| { tracing::error!("Could not spawn source: {}", e); std::io::Error::new(std::io::ErrorKind::Other, e) @@ -123,7 +122,7 @@ pub async fn spawn { system - .add_spawner(NtsSpawner::new(cfg.clone())) + .add_spawner(NtsSpawner::new(cfg.clone(), source_defaults_config)) .map_err(|e| { tracing::error!("Could not spawn source: {}", e); std::io::Error::new(std::io::ErrorKind::Other, e) @@ -131,7 +130,7 @@ pub async fn spawn { system - .add_spawner(PoolSpawner::new(cfg.clone())) + .add_spawner(PoolSpawner::new(cfg.clone(), source_defaults_config)) .map_err(|e| { tracing::error!("Could not spawn source: {}", e); std::io::Error::new(std::io::ErrorKind::Other, e) @@ -140,7 +139,7 @@ pub async fn spawn { system - .add_spawner(NtsPoolSpawner::new(cfg.clone())) + .add_spawner(NtsPoolSpawner::new(cfg.clone(), source_defaults_config)) .map_err(|e| { tracing::error!("Could not spawn source: {}", e); std::io::Error::new(std::io::ErrorKind::Other, e) @@ -148,7 +147,7 @@ pub async fn spawn { system - .add_spawner(SockSpawner::new(cfg.clone())) + .add_spawner(SockSpawner::new(cfg.clone(), source_defaults_config)) .map_err(|e| { tracing::error!("Could not spawn source: {}", e); std::io::Error::new(std::io::ErrorKind::Other, e) @@ -224,7 +223,6 @@ impl< timestamp_mode: TimestampMode, synchronization_config: SynchronizationConfig, algorithm_config: Controller::AlgorithmConfig, - source_defaults_config: SourceDefaultsConfig, keyset: tokio::sync::watch::Receiver>, ip_list: tokio::sync::watch::Receiver>, have_sources: bool, @@ -232,7 +230,6 @@ impl< let Ok(mut system) = System::new( clock.clone(), synchronization_config, - source_defaults_config, algorithm_config, ip_list.borrow().clone(), ) else { @@ -493,6 +490,7 @@ impl< SourceCreateParameters::Ntp(ref mut params) => { let (source, initial_actions) = self.system.create_ntp_source( source_id, + params.config, params.addr, params.protocol_version, params.nts.take(), @@ -515,9 +513,11 @@ impl< ); } SourceCreateParameters::Sock(ref params) => { - let source = self - .system - .create_sock_source(source_id, params.noise_estimate)?; + let source = self.system.create_sock_source( + source_id, + params.config, + params.noise_estimate, + )?; SockSourceTask::spawn( source_id, params.path.clone(), diff --git a/ntpd/src/force_sync/algorithm.rs b/ntpd/src/force_sync/algorithm.rs index 587d46e91..d6194745c 100644 --- a/ntpd/src/force_sync/algorithm.rs +++ b/ntpd/src/force_sync/algorithm.rs @@ -2,7 +2,8 @@ use std::fmt::Debug; use std::{collections::HashMap, marker::PhantomData}; use ntp_proto::{ - Measurement, NtpClock, NtpDuration, PollInterval, SourceController, TimeSyncController, + Measurement, NtpClock, NtpDuration, PollInterval, SourceConfig, SourceController, + TimeSyncController, }; use serde::Deserialize; @@ -42,7 +43,6 @@ impl WrapMeasurements<()> for Measurement<()> { pub(crate) struct SingleShotController { pub(super) clock: C, sources: HashMap, - min_poll_interval: PollInterval, min_agreeing: usize, } @@ -132,13 +132,11 @@ impl TimeSyncController for SingleShotController { fn new( clock: Self::Clock, synchronization_config: ntp_proto::SynchronizationConfig, - source_defaults_config: ntp_proto::SourceDefaultsConfig, algorithm_config: Self::AlgorithmConfig, ) -> Result::Error> { Ok(SingleShotController { clock, sources: HashMap::new(), - min_poll_interval: source_defaults_config.poll_interval_limits.min, min_agreeing: synchronization_config .minimum_agreeing_sources .max(algorithm_config.expected_sources / 2), @@ -150,10 +148,14 @@ impl TimeSyncController for SingleShotController { Ok(()) } - fn add_source(&mut self, _id: Self::SourceId) -> Self::NtpSourceController { + fn add_source( + &mut self, + _id: Self::SourceId, + config: SourceConfig, + ) -> Self::NtpSourceController { SingleShotSourceController:: { delay_type: PhantomData, - min_poll_interval: self.min_poll_interval, + min_poll_interval: config.poll_interval_limits.min, done: false, } } @@ -161,11 +163,12 @@ impl TimeSyncController for SingleShotController { fn add_one_way_source( &mut self, _id: Self::SourceId, + config: SourceConfig, _measurement_noise_estimate: f64, ) -> Self::OneWaySourceController { SingleShotSourceController::<()> { delay_type: PhantomData, - min_poll_interval: self.min_poll_interval, + min_poll_interval: config.poll_interval_limits.min, done: false, } }