Skip to content

Commit

Permalink
Add basic source snapshots for PPS and sock sources
Browse files Browse the repository at this point in the history
  • Loading branch information
michielp1807 committed Jan 10, 2025
1 parent dac1cdb commit d46c7b2
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
17 changes: 17 additions & 0 deletions ntp-proto/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ impl<Controller: SourceController<MeasurementDelay = ()>> OneWaySource<Controlle
pub fn handle_message(&mut self, message: Controller::ControllerMessage) {
self.controller.handle_message(message)
}

pub fn observe<SourceId>(
&self,
name: String,
address: String,
id: SourceId,
) -> ObservableSourceState<SourceId> {
ObservableSourceState {
timedata: self.controller.observe(),
unanswered_polls: 0,
poll_interval: crate::time_types::PollInterval::from_byte(0),
nts_cookies: None,
name,
address,
id,
}
}
}

#[derive(Debug, Copy, Clone)]
Expand Down
18 changes: 17 additions & 1 deletion ntpd/src/daemon/pps_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) struct PpsSourceTask<
index: SourceId,
clock: C,
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
path: PathBuf,
source: OneWaySource<Controller>,
fetch_receiver: mpsc::Receiver<pps_time::pps::pps_fdata>,
}
Expand Down Expand Up @@ -102,11 +103,25 @@ where
},
message: controller_message,
};

self.channels
.msg_for_system_sender
.send(MsgForSystem::OneWaySourceUpdate(self.index, update))
.await
.ok();

self.channels
.source_snapshots
.write()
.expect("Unexpected poisoned mutex")
.insert(
self.index,
self.source.observe(
"PPS device".to_string(),
self.path.display().to_string(),
self.index,
),
);
}
None => {
warn!("Did not receive any new PPS data");
Expand Down Expand Up @@ -134,7 +149,7 @@ where
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
source: OneWaySource<Controller>,
) -> tokio::task::JoinHandle<()> {
let pps = PpsDevice::new(device_path).expect("Could not open PPS device");
let pps = PpsDevice::new(device_path.clone()).expect("Could not open PPS device");
let cap = pps.get_cap().expect("Could not get PPS capabilities");
if cap & pps_time::pps::PPS_CANWAIT == 0 {
panic!("PPS device does not support blocking calls")
Expand All @@ -154,6 +169,7 @@ where
index,
clock,
channels,
path: device_path,
source,
fetch_receiver,
};
Expand Down
17 changes: 16 additions & 1 deletion ntpd/src/daemon/sock_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub(crate) struct SockSourceTask<
index: SourceId,
socket: UnixDatagram,
clock: C,
path: PathBuf,
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
source: OneWaySource<Controller>,
}
Expand Down Expand Up @@ -174,6 +175,19 @@ where
.send(MsgForSystem::OneWaySourceUpdate(self.index, update))
.await
.ok();

self.channels
.source_snapshots
.write()
.expect("Unexpected poisoned mutex")
.insert(
self.index,
self.source.observe(
"GPSd socket".to_string(),
self.path.display().to_string(),
self.index,
),
);
}
Err(e) => {
error!("Error deserializing sample: {}", e);
Expand Down Expand Up @@ -201,13 +215,14 @@ where
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
source: OneWaySource<Controller>,
) -> tokio::task::JoinHandle<()> {
let socket = create_socket(socket_path).expect("Could not create socket");
let socket = create_socket(&socket_path).expect("Could not create socket");
tokio::spawn(
(async move {
let mut process = SockSourceTask {
index,
socket,
clock,
path: socket_path,
channels,
source,
};
Expand Down

0 comments on commit d46c7b2

Please sign in to comment.