Skip to content

Commit

Permalink
Merge pull request #49 from lf-lang/clock-synchronization
Browse files Browse the repository at this point in the history
Enable clock synchronization with federates
  • Loading branch information
chanijjani authored Mar 25, 2024
2 parents da9bf64 + 768fe4d commit d5caadd
Show file tree
Hide file tree
Showing 6 changed files with 556 additions and 52 deletions.
15 changes: 14 additions & 1 deletion rust/rti/src/federate_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct FederateInfo {
// a federate when handling lf_request_stop().
// TODO: lf_thread_t thread_id;
stream: Option<TcpStream>, // The TCP socket descriptor for communicating with this federate.
// TODO: struct sockaddr_in UDP_addr;
udp_addr: SocketAddr,
clock_synchronization_enabled: bool, // Indicates the status of clock synchronization
// for this federate. Enabled by default.
in_transit_message_tags: InTransitMessageQueue, // Record of in-transit messages to this federate that are not
Expand All @@ -54,6 +54,7 @@ impl FederateInfo {
enclave: SchedulingNode::new(),
requested_stop: false,
stream: None::<TcpStream>,
udp_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
clock_synchronization_enabled: true,
in_transit_message_tags: InTransitMessageQueue::new(),
server_hostname: String::from("localhost"),
Expand All @@ -78,6 +79,14 @@ impl FederateInfo {
&self.stream
}

pub fn stream_mut(&mut self) -> &mut Option<TcpStream> {
&mut self.stream
}

pub fn udp_addr(&self) -> SocketAddr {
self.udp_addr.clone()
}

pub fn clock_synchronization_enabled(&self) -> bool {
self.clock_synchronization_enabled
}
Expand Down Expand Up @@ -110,6 +119,10 @@ impl FederateInfo {
self.stream = Some(stream);
}

pub fn set_udp_addr(&mut self, udp_addr: SocketAddr) {
self.udp_addr = udp_addr;
}

pub fn set_clock_synchronization_enabled(&mut self, clock_synchronization_enabled: bool) {
self.clock_synchronization_enabled = clock_synchronization_enabled;
}
Expand Down
111 changes: 88 additions & 23 deletions rust/rti/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::error::Error;

use crate::constants::*;
use crate::federate_info::*;
use crate::net_common::SocketType;
use crate::rti_common::*;
use crate::rti_remote::*;
use crate::trace::Trace;
Expand All @@ -29,7 +30,7 @@ use server::Server;

const RTI_TRACE_FILE_NAME: &str = "rti.lft";

#[derive(PartialEq, PartialOrd, Clone)]
#[derive(PartialEq, PartialOrd, Clone, Debug)]
pub enum ClockSyncStat {
ClockSyncOff,
ClockSyncInit,
Expand All @@ -49,18 +50,15 @@ impl ClockSyncStat {
pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static str> {
let mut idx = 1;
let argc = argv.len();
// println!("argv = {:?}", argv);
while idx < argc {
let arg = argv[idx].as_str();
// println!("arg = {}", arg); // TODO: Remove this debugging code
if arg == "-i" || arg == "--id" {
if argc < idx + 2 {
println!("--id needs a string argument.");
usage(argc, argv);
return Err("Fail to handle id option");
}
idx += 1;
// println!("idx = {}", idx); // TODO: Remove this debugging code
println!("RTI: Federation ID: {}", arg);
rti.set_federation_id(argv[idx].clone());
} else if arg == "-n" || arg == "--number_of_federates" {
Expand Down Expand Up @@ -125,7 +123,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static
return Err("Fail to handle clock_sync option");
}
idx += 1;
// TODO: idx += process_clock_sync_args();
idx += process_clock_sync_args(rti, argc - idx, &argv[idx..]);
} else if arg == "-t" || arg == "--tracing" {
rti.base_mut().set_tracing_enabled(true);
} else if arg == " " {
Expand Down Expand Up @@ -175,6 +173,84 @@ fn usage(argc: usize, argv: &[String]) {
}
}

/**
* Process command-line arguments related to clock synchronization. Will return
* the last read position of argv if all related arguments are parsed or an
* invalid argument is read.
*/
fn process_clock_sync_args(rti: &mut RTIRemote, argc: usize, argv: &[String]) -> usize {
for mut i in 0..argc {
let arg = argv[i].as_str();
if arg == "off" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOff);
println!("RTI: Clock sync: off");
} else if arg == "init" || arg == "initial" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncInit);
println!("RTI: Clock sync: init");
} else if arg == "on" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOn);
println!("RTI: Clock sync: on");
} else if arg == "period" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn {
println!("[ERROR] clock sync period can only be set if --clock-sync is set to on.");
usage(argc, argv);
i += 1;
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync period needs a time (in nanoseconds) argument.");
usage(argc, argv);
continue;
}
i += 1;
match argv[i].as_str().parse::<u64>() {
Ok(period_ns) => {
if period_ns == 0 || period_ns == u64::MAX {
println!("[ERROR] clock sync period value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_period_ns(period_ns);
println!("RTI: Clock sync period: {}", rti.clock_sync_period_ns());
}
Err(_) => {
println!("Failed to parse clock sync period.");
}
}
} else if argv[i] == "exchanges-per-interval" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn
&& rti.clock_sync_global_status() != ClockSyncStat::ClockSyncInit
{
println!("[ERROR] clock sync exchanges-per-interval can only be set if\n--clock-sync is set to on or init.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync exchanges-per-interval needs an integer argument.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
}
i += 1;
let exchanges: u32 = 10;
if exchanges == 0 || exchanges == u32::MAX || exchanges == u32::MIN {
println!("[ERROR] clock sync exchanges-per-interval value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_exchanges_per_interval(exchanges); // FIXME: Loses numbers on 64-bit machines
println!(
"RTI: Clock sync exchanges per interval: {}",
rti.clock_sync_exchanges_per_interval()
);
} else if arg == " " {
// Tolerate spaces
continue;
} else {
// Either done with the clock sync args or there is an invalid
// character. In either case, let the parent function deal with
// the rest of the characters;
return i;
}
}
argc
}

pub fn initialize_federates(rti: &mut RTIRemote) {
if rti.base().tracing_enabled() {
let _lf_number_of_workers = rti.base().number_of_scheduling_nodes();
Expand Down Expand Up @@ -205,27 +281,16 @@ fn initialize_federate(fed: &mut FederateInfo, id: u16) {

pub fn start_rti_server(_f_rti: &mut RTIRemote) -> Result<Server, Box<dyn Error>> {
// TODO: _lf_initialize_clock();
Ok(Server::create_server(
_f_rti.user_specified_port().to_string(),
))
let server = Server::create_rti_server(_f_rti, _f_rti.user_specified_port(), SocketType::TCP);
if _f_rti.clock_sync_global_status() >= ClockSyncStat::ClockSyncOn {
let final_tcp_port = u16::from(_f_rti.final_port_tcp());
Server::create_rti_server(_f_rti, final_tcp_port + 1, SocketType::UDP);
}
Ok(server)
}

/**
* Process command-line arguments related to clock synchronization. Will return
* the last read position of argv if all related arguments are parsed or an
* invalid argument is read.
*
* @param argc: Number of arguments in the list
* @param argv: The list of arguments as a string
* @return Current position (head) of argv;
*/
// TODO: implement this function
// fn process_clock_sync_args(_argc: i32, _argv: &[String]) -> i32 {
// 0
// }

/**
* Initialize the _RTI instance.
* Initialize the RTI instance.
*/
pub fn initialize_rti() -> RTIRemote {
RTIRemote::new()
Expand Down
18 changes: 18 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ pub enum MsgType {
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
ClockSyncT1,
ClockSyncT3,
ClockSyncT4,
ClockSyncCodedProbe,
PortAbsent,
NeighborStructure,
Ignore,
Expand All @@ -129,6 +133,10 @@ impl MsgType {
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::ClockSyncT1 => 19,
MsgType::ClockSyncT3 => 20,
MsgType::ClockSyncT4 => 21,
MsgType::ClockSyncCodedProbe => 22,
MsgType::PortAbsent => 23,
MsgType::NeighborStructure => 24,
MsgType::Ignore => 250,
Expand All @@ -150,6 +158,10 @@ impl MsgType {
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
19 => MsgType::ClockSyncT1,
20 => MsgType::ClockSyncT3,
21 => MsgType::ClockSyncT4,
22 => MsgType::ClockSyncCodedProbe,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down Expand Up @@ -183,6 +195,12 @@ impl ErrType {
}
}

#[derive(PartialEq, Clone)]
pub enum SocketType {
TCP,
UDP,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 3 additions & 3 deletions rust/rti/src/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl NetUtil {
} {}
}

pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
let mut bytes_read = 0;
while match stream.read(buffer) {
Ok(msg_size) => {
Expand Down Expand Up @@ -209,7 +209,7 @@ mod tests {
}

#[test]
fn test_read_from_stream_positive() {
fn test_read_from_socket_positive() {
let port_num = 35642;
let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap();
let mut ip_address = LOCAL_HOST.to_owned();
Expand All @@ -225,7 +225,7 @@ mod tests {
),
);
let mut buffer = vec![0 as u8; buffer_size];
let read_size = NetUtil::read_from_stream(&mut stream, &mut buffer, 0);
let read_size = NetUtil::read_from_socket(&mut stream, &mut buffer, 0);
assert!(buffer == msg);
assert!(buffer_size == read_size);
}
Expand Down
51 changes: 48 additions & 3 deletions rust/rti/src/rti_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::constants::*;
use crate::ClockSyncStat;
use crate::RTICommon;

use std::net::UdpSocket;

/**
* Structure that an RTI instance uses to keep track of its own and its
* corresponding federates' state.
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct RTIRemote {
final_port_udp: u16,

/** The UDP socket descriptor for the socket server. */
socket_descriptor_udp: i32,
socket_descriptor_udp: Option<UdpSocket>,

/************* Clock synchronization information *************/
/* Thread performing PTP clock sync sessions periodically. */
Expand All @@ -80,7 +82,7 @@ pub struct RTIRemote {
/**
* Number of messages exchanged for each clock sync attempt.
*/
clock_sync_exchanges_per_interval: i32,
clock_sync_exchanges_per_interval: u32,

/**
* Boolean indicating that authentication is enabled.
Expand All @@ -105,7 +107,7 @@ impl RTIRemote {
final_port_tcp: 0,
socket_descriptor_tcp: -1,
final_port_udp: u16::MAX,
socket_descriptor_udp: -1,
socket_descriptor_udp: None,
clock_sync_global_status: ClockSyncStat::ClockSyncInit,
clock_sync_period_ns: 10 * 1000000,
clock_sync_exchanges_per_interval: 10,
Expand Down Expand Up @@ -138,6 +140,14 @@ impl RTIRemote {
self.user_specified_port
}

pub fn final_port_tcp(&self) -> u16 {
self.final_port_tcp
}

pub fn socket_descriptor_udp(&mut self) -> &mut Option<UdpSocket> {
&mut self.socket_descriptor_udp
}

pub fn final_port_udp(&self) -> u16 {
self.final_port_udp
}
Expand All @@ -146,6 +156,14 @@ impl RTIRemote {
self.clock_sync_global_status.clone()
}

pub fn clock_sync_period_ns(&self) -> u64 {
self.clock_sync_period_ns
}

pub fn clock_sync_exchanges_per_interval(&self) -> u32 {
self.clock_sync_exchanges_per_interval
}

pub fn stop_in_progress(&self) -> bool {
self.stop_in_progress
}
Expand All @@ -167,6 +185,33 @@ impl RTIRemote {
self.user_specified_port = user_specified_port;
}

pub fn set_final_port_tcp(&mut self, final_port_tcp: u16) {
self.final_port_tcp = final_port_tcp;
}

pub fn set_socket_descriptor_udp(&mut self, socket_descriptor_udp: Option<UdpSocket>) {
self.socket_descriptor_udp = socket_descriptor_udp;
}

pub fn set_final_port_udp(&mut self, final_port_udp: u16) {
self.final_port_udp = final_port_udp;
}

pub fn set_clock_sync_global_status(&mut self, clock_sync_global_status: ClockSyncStat) {
self.clock_sync_global_status = clock_sync_global_status;
}

pub fn set_clock_sync_period_ns(&mut self, clock_sync_period_ns: u64) {
self.clock_sync_period_ns = clock_sync_period_ns;
}

pub fn set_clock_sync_exchanges_per_interval(
&mut self,
clock_sync_exchanges_per_interval: u32,
) {
self.clock_sync_exchanges_per_interval = clock_sync_exchanges_per_interval;
}

pub fn set_stop_in_progress(&mut self, stop_in_progress: bool) {
self.stop_in_progress = stop_in_progress;
}
Expand Down
Loading

0 comments on commit d5caadd

Please sign in to comment.