diff --git a/ntp-proto/src/source.rs b/ntp-proto/src/source.rs index c27a8cade..6cb4539d2 100644 --- a/ntp-proto/src/source.rs +++ b/ntp-proto/src/source.rs @@ -111,6 +111,23 @@ impl> OneWaySource( + &self, + name: String, + address: String, + id: SourceId, + ) -> ObservableSourceState { + ObservableSourceState { + timedata: self.controller.observe(), + unanswered_polls: 0, + poll_interval: crate::time_types::PollInterval::from_byte(0), + nts_cookies: None, + name, + address, + id, + } + } } #[derive(Debug, Copy, Clone)] diff --git a/ntpd/src/daemon/pps_source.rs b/ntpd/src/daemon/pps_source.rs index 3611c0f15..dd717d1d6 100644 --- a/ntpd/src/daemon/pps_source.rs +++ b/ntpd/src/daemon/pps_source.rs @@ -35,6 +35,7 @@ pub(crate) struct PpsSourceTask< index: SourceId, clock: C, channels: SourceChannels, + path: PathBuf, source: OneWaySource, fetch_receiver: mpsc::Receiver, } @@ -102,11 +103,25 @@ where }, message: controller_message, }; + self.channels .msg_for_system_sender .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) .await .ok(); + + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .insert( + self.index, + self.source.observe( + "PPS device".to_string(), + self.path.display().to_string(), + self.index, + ), + ); } None => { warn!("Did not receive any new PPS data"); @@ -134,7 +149,7 @@ where channels: SourceChannels, source: OneWaySource, ) -> tokio::task::JoinHandle<()> { - let pps = PpsDevice::new(device_path).expect("Could not open PPS device"); + let pps = PpsDevice::new(device_path.clone()).expect("Could not open PPS device"); let cap = pps.get_cap().expect("Could not get PPS capabilities"); if cap & pps_time::pps::PPS_CANWAIT == 0 { panic!("PPS device does not support blocking calls") @@ -154,6 +169,7 @@ where index, clock, channels, + path: device_path, source, fetch_receiver, }; diff --git a/ntpd/src/daemon/sock_source.rs b/ntpd/src/daemon/sock_source.rs index 1eeb225e3..677c9fea8 100644 --- a/ntpd/src/daemon/sock_source.rs +++ b/ntpd/src/daemon/sock_source.rs @@ -86,6 +86,7 @@ pub(crate) struct SockSourceTask< index: SourceId, socket: UnixDatagram, clock: C, + path: PathBuf, channels: SourceChannels, source: OneWaySource, } @@ -174,6 +175,19 @@ where .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) .await .ok(); + + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .insert( + self.index, + self.source.observe( + "GPSd socket".to_string(), + self.path.display().to_string(), + self.index, + ), + ); } Err(e) => { error!("Error deserializing sample: {}", e); @@ -201,13 +215,14 @@ where channels: SourceChannels, source: OneWaySource, ) -> tokio::task::JoinHandle<()> { - let socket = create_socket(socket_path).expect("Could not create socket"); + let socket = create_socket(&socket_path).expect("Could not create socket"); tokio::spawn( (async move { let mut process = SockSourceTask { index, socket, clock, + path: socket_path, channels, source, };