diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d4c952..60f7b21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ Manually generated changelog, for now. We follow semantic versioning. +## 1.1.0 - 2024-04-05 +- Support for integration delays. +- Light networking refactor. + ## 1.0.1 - 2023-11-24 - Minor README update. diff --git a/Cargo.toml b/Cargo.toml index 3bed8b7..0a06227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "maybenot-simulator" -version = "1.0.1" +version = "1.1.0" edition = "2021" authors = ["Tobias Pulls"] license = "MIT OR Apache-2.0" @@ -18,6 +18,9 @@ maybenot = "1.0.0" log = "0.4.20" test-log = "0.2.12" fastrand = "2.0.0" +serde = "1.0.193" +rand = "0.8.5" +serde_json = "1.0.108" [dev-dependencies] env_logger = "0.10.1" diff --git a/src/integration.rs b/src/integration.rs new file mode 100644 index 0000000..0c76fbd --- /dev/null +++ b/src/integration.rs @@ -0,0 +1,119 @@ +use rand::Rng; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, error::Error, time::Duration}; + +#[derive(Serialize, Deserialize, Debug)] +struct Bin { + range: (f64, f64), + probability: f64, +} + +/// Represents a Maybenot integration and its associated delays. This can happen +/// in the case of Maybenot being integrated, e.g., in user space with a +/// protocol running in kernel space. +#[derive(Clone, Debug)] +pub struct Integration { + /// The *action* delay is the time between the integration taking action and + /// the action happening. For example, if a padding packet is to be sent, + /// user space might need to signal to kernel space to craft one. NOTE: we + /// assume that the PaddingSent event is triggered directly as padding is + /// sent from Maybenot, while we assume that the BlockingBegin event is + /// triggered when the blocking actually begins in the protocol and the + /// event is transported with a reporting delay. + pub action_delay: BinDist, + /// The *reporting* delay is the time between an event being created by the + /// integrated protocol and the event being reported (trigger_events) to + /// Maybenot. For example, this could be the time it takes to go from kernel + /// space to user space. + pub reporting_delay: BinDist, + /// The *trigger* delay is the time it takes for the integration to perform + /// a scheduled action. For example, suppose an action is scheduled for time + /// T. In that case, the trigger delay is added to T. This is important for + /// capturing async integrations, where a zero timeout on an action to send + /// padding would still take some (tiny) time to execute. + pub trigger_delay: BinDist, +} + +impl Integration { + pub fn action_delay(&self) -> Duration { + self.action_delay.sample() + } + + pub fn reporting_delay(&self) -> Duration { + self.reporting_delay.sample() + } + + pub fn trigger_delay(&self) -> Duration { + self.trigger_delay.sample() + } +} + +/// A distribution of values in bins with a probability for each bin. Used to +/// estimate delay distributions in a Maybenot integration. +#[derive(Clone, Debug)] +pub struct BinDist { + bins: Vec<(f64, f64)>, // Vec of (min, max) tuples for each bin + cumulative_probabilities: Vec, // Cumulative probabilities for efficient sampling +} + +impl BinDist { + pub fn new(json_input: &str) -> Result> { + let bins: HashMap = serde_json::from_str(json_input)?; + + let mut sorted_bins: Vec<_> = bins + .into_iter() + .map(|(range, prob)| { + // Manually parsing the range tuple + let range_values: Vec = range + .trim_matches(|c: char| c == '(' || c == ')') + .split(',') + .map(str::trim) + .map(str::parse) + .collect::, _>>()?; + + if range_values.len() != 2 { + return Err("Range must have exactly two values".into()); + } + + Ok(((range_values[0], range_values[1]), prob)) + }) + .collect::, Box>>()?; + + // Sort bins by range start for cumulative probability calculation + sorted_bins.sort_by(|a, b| a.0 .0.partial_cmp(&b.0 .0).unwrap()); + + let mut cumulative_probabilities = Vec::with_capacity(sorted_bins.len()); + let mut total_prob = 0.0; + let mut ranges = Vec::with_capacity(sorted_bins.len()); + + for (range, prob) in sorted_bins { + total_prob += prob; + cumulative_probabilities.push(total_prob); + ranges.push(range); + } + + Ok(BinDist { + bins: ranges, + cumulative_probabilities, + }) + } + + pub fn sample(&self) -> Duration { + let mut rng = rand::thread_rng(); + let sample_prob = rng.gen::(); + let bin_index = match self + .cumulative_probabilities + .binary_search_by(|prob| prob.partial_cmp(&sample_prob).unwrap()) + { + Ok(index) => index, + Err(index) => index, + }; + + let (min, max) = self.bins[bin_index]; + // bins are in milliseconds, to get microseconds we multiply by 1000 + if min == max { + return Duration::from_micros((min * 1000.0) as u64); + } + Duration::from_micros((rng.gen_range(min..max) * 1000.0) as u64) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5877708..251a49f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ //! ## Example usage //! ``` //! use maybenot::{framework::TriggerEvent, machine::Machine}; -//! use maybenot_simulator::{parse_trace, sim}; +//! use maybenot_simulator::{parse_trace, network::Network, sim}; //! use std::{str::FromStr, time::Duration}; //! //! // A trace of ten packets from the client's perspective when visiting @@ -33,15 +33,15 @@ //! 9401094589,s,73 //! 9420892765,r,191"; //! -//! // The delay between client and server. This is for the simulation of the -//! // network between the client and server -//! let delay = Duration::from_millis(10); +//! // The network model for simulating the network between the client and the +//! // server. Currently just a delay. +//! let network = Network::new(Duration::from_millis(10)); //! //! // Parse the raw trace into a queue of events for the simulator. This uses //! // the delay to generate a queue of events at the client and server in such //! // a way that the client is ensured to get the packets in the same order and //! // at the same time as in the raw trace. -//! let mut input_trace = parse_trace(raw_trace, delay); +//! let mut input_trace = parse_trace(raw_trace, &network); //! //! // A simple machine that sends one padding packet of 1000 bytes 20 //! // milliseconds after the first NonPaddingSent is sent. @@ -51,7 +51,7 @@ //! //! // Run the simulator with the machine at the client. Run the simulation up //! // until 100 packets have been recorded (total, client and server). -//! let trace = sim(&[m], &[], &mut input_trace, delay, 100, true); +//! let trace = sim(&[m], &[], &mut input_trace, network.delay, 100, true); //! //! // print packets from the client's perspective //! let starting_time = trace[0].time; @@ -106,6 +106,7 @@ //! received 191 bytes at 9420 ms //! ``` +pub mod integration; pub mod network; pub mod peek; pub mod queue; @@ -116,7 +117,9 @@ use std::{ time::{Duration, Instant}, }; +use integration::Integration; use log::debug; +use network::Network; use queue::SimQueue; use maybenot::{ @@ -136,6 +139,7 @@ use crate::{ pub struct SimEvent { pub event: TriggerEvent, pub time: Instant, + pub delay: Duration, pub client: bool, // internal flag to mark event as bypass bypass: bool, @@ -167,6 +171,8 @@ pub struct SimState { last_sent_time: Instant, /// size of the last sent packet last_sent_size: u16, + /// integration aspects for this state + integration: Option, } impl SimState @@ -179,6 +185,7 @@ where max_padding_frac: f64, max_blocking_frac: f64, mtu: u16, + integration: Option, ) -> Self { Self { framework: Framework::new( @@ -198,8 +205,30 @@ where .checked_sub(Duration::from_millis(1000)) .unwrap(), last_sent_size: 0, + integration, } } + + pub fn reporting_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } + + pub fn action_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.action_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } + + pub fn trigger_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.trigger_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } } /// The main simulator function. @@ -231,14 +260,15 @@ pub fn sim( max_trace_length: usize, only_network_activity: bool, ) -> Vec { - let args = SimulatorArgs::new(delay, max_trace_length, only_network_activity); + let network = Network::new(delay); + let args = SimulatorArgs::new(&network, max_trace_length, only_network_activity); sim_advanced(machines_client, machines_server, sq, &args) } /// Arguments for [`sim_advanced`]. #[derive(Clone, Debug)] -pub struct SimulatorArgs { - pub delay: Duration, +pub struct SimulatorArgs<'a> { + pub network: &'a Network, pub max_trace_length: usize, pub max_sim_iterations: usize, pub only_client_events: bool, @@ -248,12 +278,14 @@ pub struct SimulatorArgs { pub max_padding_frac_server: f64, pub max_blocking_frac_server: f64, pub mtu: u16, + pub client_integration: Option<&'a Integration>, + pub server_integration: Option<&'a Integration>, } -impl SimulatorArgs { - pub fn new(delay: Duration, max_trace_length: usize, only_network_activity: bool) -> Self { +impl<'a> SimulatorArgs<'a> { + pub fn new(network: &'a Network, max_trace_length: usize, only_network_activity: bool) -> Self { Self { - delay, + network, max_trace_length, max_sim_iterations: 0, only_client_events: false, @@ -264,6 +296,8 @@ impl SimulatorArgs { max_blocking_frac_server: 0.0, // WireGuard default MTU mtu: 1420, + client_integration: None, + server_integration: None, } } } @@ -290,6 +324,7 @@ pub fn sim_advanced( args.max_padding_frac_client, args.max_blocking_frac_client, args.mtu, + args.client_integration.cloned(), ); let mut server = SimState::new( machines_server, @@ -297,6 +332,7 @@ pub fn sim_advanced( args.max_padding_frac_server, args.max_blocking_frac_server, args.mtu, + args.server_integration.cloned(), ); let mut sim_iterations = 0; @@ -341,9 +377,9 @@ pub fn sim_advanced( // where the simulator simulates the entire network between the client // and the server. TODO: make delay/network more realistic. let network_activity = if next.client { - sim_network_activity(&next, sq, &client, current_time, args.delay) + sim_network_activity(&next, sq, &client, &server, args.network, ¤t_time) } else { - sim_network_activity(&next, sq, &server, current_time, args.delay) + sim_network_activity(&next, sq, &server, &client, args.network, ¤t_time) }; if network_activity { @@ -366,20 +402,10 @@ pub fn sim_advanced( // get actions, update scheduled actions if next.client { debug!("sim(): trigger @client framework\n{:#?}", next.event); - trigger_update( - &mut client.framework, - &mut client.scheduled_action, - &next, - ¤t_time, - ); + trigger_update(&mut client, &next, ¤t_time); } else { debug!("sim(): trigger @server framework\n{:#?}", next.event); - trigger_update( - &mut server.framework, - &mut server.scheduled_action, - &next, - ¤t_time, - ); + trigger_update(&mut server, &next, ¤t_time); } // conditional save to resulting trace: only on network activity if set @@ -387,7 +413,24 @@ pub fn sim_advanced( if (!args.only_network_activity || network_activity) && (!args.only_client_events || next.client) { - trace.push(next); + // this should be a network trace: adjust timestamps based on any + // integration delays + let mut n = next.clone(); + match next.event { + TriggerEvent::PaddingSent { .. } => { + // padding adds the action delay + n.time += n.delay; + } + TriggerEvent::PaddingRecv { .. } + | TriggerEvent::NonPaddingRecv { .. } + | TriggerEvent::NonPaddingSent { .. } => { + // reported events remove the reporting delay + n.time -= n.delay; + } + _ => {} + } + + trace.push(n); } if args.max_trace_length > 0 && trace.len() >= args.max_trace_length { @@ -411,6 +454,10 @@ pub fn sim_advanced( debug!("sim(): main loop end, more work?"); debug!("#########################################################"); } + + // sort the trace by time + trace.sort_by(|a, b| a.time.cmp(&b.time)); + trace } @@ -460,6 +507,8 @@ fn pick_next>( // create SimEvent and move blocking into (what soon will be) the past // to indicate that it has been processed let time: Instant; + // ASSUMPTION: block outgoing is reported from integration + let delay: Duration; let client_earliest = if client.blocking_until >= current_time && server.blocking_until >= current_time { client.blocking_until <= server.blocking_until @@ -468,10 +517,12 @@ fn pick_next>( }; if client_earliest { - time = client.blocking_until; + delay = client.reporting_delay(); + time = client.blocking_until + delay; client.blocking_until -= Duration::from_micros(1); } else { - time = server.blocking_until; + delay = server.reporting_delay(); + time = server.blocking_until + delay; server.blocking_until -= Duration::from_micros(1); } @@ -479,6 +530,7 @@ fn pick_next>( client: client_earliest, event: TriggerEvent::BlockingEnd, time, + delay, fuzz: fastrand::i32(..), bypass: false, replace: false, @@ -491,7 +543,7 @@ fn pick_next>( let target = current_time + s; let act = do_scheduled(client, server, current_time, target); if let Some(a) = act { - sq.push_sim(a, Reverse(current_time)); + sq.push_sim(a.clone(), Reverse(a.time)); } pick_next(sq, client, server, current_time) } @@ -548,17 +600,26 @@ fn do_scheduled>( bypass, replace, machine, - } => Some(SimEvent { - event: TriggerEvent::PaddingSent { - bytes_sent: size, - machine, - }, - time: a.time, - client: a_is_client, - bypass, - replace, - fuzz: fastrand::i32(..), - }), + } => { + let action_delay = if a_is_client { + client.action_delay() + } else { + server.action_delay() + }; + + Some(SimEvent { + event: TriggerEvent::PaddingSent { + bytes_sent: size, + machine, + }, + time: a.time, + delay: action_delay, + client: a_is_client, + bypass, + replace, + fuzz: fastrand::i32(..), + }) + } Action::BlockOutgoing { timeout: _, duration, @@ -568,6 +629,13 @@ fn do_scheduled>( } => { let block = a.time + duration; let event_bypass; + // ASSUMPTION: block outgoing reported from integration + let total_delay = if a_is_client { + client.action_delay() + client.reporting_delay() + } else { + server.action_delay() + server.reporting_delay() + }; + let reported = a.time + total_delay; // should we update client/server blocking? if a_is_client { @@ -587,7 +655,8 @@ fn do_scheduled>( // event triggered regardless Some(SimEvent { event: TriggerEvent::BlockingBegin { machine }, - time: a.time, + time: reported, + delay: total_delay, client: a_is_client, bypass: event_bypass, replace: false, @@ -598,20 +667,24 @@ fn do_scheduled>( } fn trigger_update>( - f: &mut Framework, - actions: &mut HashMap, + state: &mut SimState, next: &SimEvent, current_time: &Instant, ) { + let trigger_delay = state.trigger_delay(); + // parse actions and update - for action in f.trigger_events(&[next.event.clone()], *current_time) { + for action in state + .framework + .trigger_events(&[next.event.clone()], *current_time) + { match action { Action::Cancel { machine } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time, + time: *current_time + trigger_delay, }, ); } @@ -622,11 +695,11 @@ fn trigger_update>( replace: _, machine, } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time + *timeout, + time: *current_time + *timeout + trigger_delay, }, ); } @@ -637,11 +710,11 @@ fn trigger_update>( replace: _, machine, } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time + *timeout, + time: *current_time + *timeout + trigger_delay, }, ); } @@ -657,7 +730,17 @@ fn trigger_update>( /// number of bytes sent or received. The delay is used to model the network /// delay between the client and server. Returns a SimQueue with the events in /// the trace for use with [`sim`]. -pub fn parse_trace(trace: &str, delay: Duration) -> SimQueue { + +pub fn parse_trace(trace: &str, network: &Network) -> SimQueue { + parse_trace_advanced(trace, network, None, None) +} + +pub fn parse_trace_advanced( + trace: &str, + network: &Network, + client: Option<&Integration>, + server: Option<&Integration>, +) -> SimQueue { let mut sq = SimQueue::new(); // we just need a random starting time to make sure that we don't start from @@ -672,29 +755,43 @@ pub fn parse_trace(trace: &str, delay: Duration) -> SimQueue { let size = parts[2].trim().parse::().unwrap(); match parts[1] { - "s" => { + "s" | "sn" => { // client sent at the given time + let reporting_delay = client + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)); + let reported = timestamp + reporting_delay; sq.push( TriggerEvent::NonPaddingSent { bytes_sent: size as u16, }, true, - timestamp, - Reverse(timestamp), + reported, + reporting_delay, + Reverse(reported), ); } - "r" => { + "r" | "rn" => { // sent by server delay time ago - let sent = timestamp.checked_sub(delay).unwrap(); + let sent = timestamp.checked_sub(network.delay).unwrap(); + // but reported to the Maybenot framework at the server with delay + let reporting_delay = server + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)); + let reported = sent + reporting_delay; sq.push( TriggerEvent::NonPaddingSent { bytes_sent: size as u16, }, false, - sent, - Reverse(sent), + reported, + reporting_delay, + Reverse(reported), ); } + "sp" | "rp" => { + // TODO: figure out of ignoring is the right thing to do + } _ => { panic!("invalid direction") } diff --git a/src/network.rs b/src/network.rs index 65fb673..eac6038 100644 --- a/src/network.rs +++ b/src/network.rs @@ -2,7 +2,7 @@ //! client and server. use std::{ - cmp::Reverse, + cmp::{max, Reverse}, time::{Duration, Instant}, }; @@ -11,6 +11,23 @@ use maybenot::{event::Event, framework::TriggerEvent, machine::Machine}; use crate::{queue::SimQueue, SimEvent, SimState}; +/// A model of the network between the client and server. TODO: make this more +/// than just a delay. +#[derive(Debug, Clone)] +pub struct Network { + pub delay: Duration, +} + +impl Network { + pub fn new(delay: Duration) -> Self { + Self { delay } + } + + pub fn sample(&self) -> Duration { + self.delay + } +} + /// The network replace window is the time window in which we can replace /// padding with existing padding or non-padding already queued (or about to be /// queued up). The behavior here is tricky, since it'll differ how different @@ -26,8 +43,9 @@ pub fn sim_network_activity>( next: &SimEvent, sq: &mut SimQueue, state: &SimState, - current_time: Instant, - delay: Duration, + recipient: &SimState, + network: &Network, + current_time: &Instant, ) -> bool { let side = if next.client { "client" } else { "server" }.to_string(); @@ -35,15 +53,29 @@ pub fn sim_network_activity>( // easy: queue up the recv event on the other side TriggerEvent::NonPaddingSent { bytes_sent } => { debug!("\tqueue {}", Event::NonPaddingRecv); - // TODO: make the network more than a delay! - let time = current_time + delay; + // The time the event was reported to us is in next.time. We have to + // remove the reporting delay locally, then add a network delay and + // a reporting delay (at the recipient) for the recipient. + // + // LIMITATION, we also have to deal with an ugly edge-case: if the + // reporting delay is very long *at the sender*, then the event can + // actually arrive earlier at the recipient than it was reported to + // the sender. This we cannot deal with in the current design of the + // simulator (support for integration delays was bolted on late), + // because it would move time backwards. Therefore, we clamp. + let reporting_delay = recipient.reporting_delay(); + let reported = max( + next.time - next.delay + network.sample() + reporting_delay, + *current_time, + ); sq.push( TriggerEvent::NonPaddingRecv { bytes_recv: bytes_sent, }, !next.client, - time, - Reverse(time), + reported, + reporting_delay, + Reverse(reported), ); true @@ -76,7 +108,9 @@ pub fn sim_network_activity>( } // can replace with nonpadding that's queued to be sent within - // the network replace window? + // the network replace window? FIXME: here be bugs related to + // integration delays. Once blocking is implemented, this code + // needs to be reworked. let peek = sq.peek_blocking(state.blocking_bypassable, next.client); if let Some((queued, _)) = peek { let queued = queued.clone(); @@ -95,8 +129,8 @@ pub fn sim_network_activity>( if queued_bytes_sent <= bytes_sent { debug!("replacing padding sent with queued non-padding @{}", side,); // let the NonPaddingSent event bypass - // blocking by making a copy of the eevent - // with the approproiate flags set + // blocking by making a copy of the event + // with the appropriate flags set let mut tmp = queued.clone(); tmp.bypass = true; tmp.replace = false; @@ -116,19 +150,22 @@ pub fn sim_network_activity>( // nothing to replace with (or we're not replacing), so queue up debug!("\tqueue {}", Event::PaddingRecv); - let time = current_time + delay; + let reporting_delay = recipient.reporting_delay(); + // action delay + network + recipient reporting delay + let reported = next.time + next.delay + network.sample() + reporting_delay; sq.push( TriggerEvent::PaddingRecv { bytes_recv: bytes_sent, }, !next.client, - time, - Reverse(time), + reported, + reporting_delay, + Reverse(reported), ); true } - // receiving (non-)padding is reciving a packet + // receiving (non-)padding is receiving a packet TriggerEvent::NonPaddingRecv { .. } | TriggerEvent::PaddingRecv { .. } => true, _ => false, } diff --git a/src/queue.rs b/src/queue.rs index 7774bbc..3ba855a 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,6 +1,9 @@ //! The main queue of events in the simulator. -use std::{cmp::Reverse, time::Instant}; +use std::{ + cmp::Reverse, + time::{Duration, Instant}, +}; use maybenot::framework::TriggerEvent; use priority_queue::PriorityQueue; @@ -45,12 +48,14 @@ impl SimQueue { event: TriggerEvent, is_client: bool, time: Instant, + delay: Duration, priority: Reverse, ) { self.push_sim( SimEvent { event, time, + delay, client: is_client, bypass: false, replace: false, diff --git a/tests/example.rs b/tests/example.rs index e11f9a9..b020cad 100644 --- a/tests/example.rs +++ b/tests/example.rs @@ -1,5 +1,5 @@ use maybenot::{framework::TriggerEvent, machine::Machine}; -use maybenot_simulator::{parse_trace, sim}; +use maybenot_simulator::{network::Network, parse_trace, sim}; use std::{str::FromStr, time::Duration}; #[test_log::test] @@ -19,15 +19,15 @@ fn simulator_example_use() { 9401094589,s,73 9420892765,r,191"; - // The delay between client and server. This is for the simulation of the - // network between the client and server - let delay = Duration::from_millis(10); + // The network model for simulating the network between the client and the + // server. Currently just a delay. + let network = Network::new(Duration::from_millis(10)); // Parse the raw trace into a queue of events for the simulator. This uses // the delay to generate a queue of events at the client and server in such // a way that the client is ensured to get the packets in the same order and // at the same time as in the raw trace. - let mut input_trace = parse_trace(raw_trace, delay); + let mut input_trace = parse_trace(raw_trace, &network); // A simple machine that sends one padding packet of 1000 bytes 20 // milliseconds after the first NonPaddingSent is sent. @@ -37,7 +37,7 @@ fn simulator_example_use() { // Run the simulator with the machine at the client. Run the simulation up // until 100 packets have been recorded (total, client and server). - let trace = sim(&[m], &[], &mut input_trace, delay, 100, true); + let trace = sim(&[m], &[], &mut input_trace, network.delay, 100, true); // print packets from the client's perspective let starting_time = trace[0].time; diff --git a/tests/integration_delay.rs b/tests/integration_delay.rs new file mode 100644 index 0000000..87e0eee --- /dev/null +++ b/tests/integration_delay.rs @@ -0,0 +1,333 @@ +use std::{collections::HashMap, time::Duration}; + +use maybenot::{ + dist::{Dist, DistType}, + event::Event, + machine::Machine, + state::State, +}; +use maybenot_simulator::{ + integration::{BinDist, Integration}, + network::Network, + parse_trace_advanced, sim_advanced, SimEvent, SimulatorArgs, +}; + +fn get_test_machine() -> Machine { + // a simple machine that pads once after 5ms + let num_states = 2; + let mut t: HashMap> = HashMap::new(); + let mut e: HashMap = HashMap::new(); + e.insert(1, 1.0); + t.insert(Event::NonPaddingSent, e); + let s0 = State::new(t, num_states); + let t: HashMap> = HashMap::new(); + let mut s1 = State::new(t, num_states); + s1.timeout = Dist { + dist: DistType::Uniform, + param1: 0.0, + param2: 0.0, + start: 5.0 * 1000.0, + max: 0.0, + }; + Machine { + allowed_padding_bytes: 10000, + max_padding_frac: 1.0, + allowed_blocked_microsec: 0, + max_blocking_frac: 0.0, + states: vec![s0, s1], + include_small_packets: true, + } +} + +fn run_sim( + client: Option<&Integration>, + server: Option<&Integration>, + only_client: bool, +) -> Vec { + // a simple machine that pads once after 5ms + let m = get_test_machine(); + + let raw_trace = "0,s,100 + 10000000,r,100 + 20000000,s,100 + 32000000,r,100 + 56000000,s,100 + 100000000,s,100"; + let network = Network::new(Duration::from_millis(5)); + + let mut input_trace = parse_trace_advanced(raw_trace, &network, client, server); + + let mut args = SimulatorArgs::new(&network, 100, true); + args.client_integration = client; + args.server_integration = server; + let trace = sim_advanced(&[m], &[], &mut input_trace, &args); + + let trace: Vec<_> = trace + .into_iter() + .filter(|e| e.client == only_client) + .collect(); + println!("trace: {:?}", trace); + trace +} + +fn get_1ms_delay_dist() -> BinDist { + BinDist::new( + r#"{ + "(1.0, 1.0)": 1.0 + }"#, + ) + .unwrap() +} + +fn get_0ms_delay_dist() -> BinDist { + BinDist::new( + r#"{ + "(0.0, 0.0)": 1.0 + }"#, + ) + .unwrap() +} + +#[test_log::test] +fn test_action_delay() { + // action delay should be visible in the network trace we get from the + // simulator, by simply delaying padding packets by the action delay or + // delaying blocking to start/stop by the action delay + + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_0ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(0)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.action_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_reporting_delay() { + // reporting delay should be indirectly visible in the network trace we get + // from the simulator, because events reported by the simulator will have a + // delay, resulting actions will be delayed, and the resulting padding + // packets will therefore be delayed in the network trace + + let integration = Integration { + action_delay: get_0ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(0)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.reporting_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.reporting_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_trigger_delay() { + // trigger delay should be visible in the network trace we get from the + // simulator, by simply delaying padding packets by the trigger delay + + let integration = Integration { + action_delay: get_0ms_delay_dist(), + reporting_delay: get_0ms_delay_dist(), + trigger_delay: get_1ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(0)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(0)); + assert_eq!(integration.trigger_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.trigger_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.trigger_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_action_and_reporting_delay() { + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + integration.reporting_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.reporting_delay() + integration.action_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_action_reporting_and_delay() { + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_1ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + assert_eq!(integration.trigger_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + integration.reporting_delay() + integration.trigger_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + + integration.reporting_delay() + + integration.action_delay() + + integration.trigger_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} diff --git a/tests/simulator.rs b/tests/simulator.rs index 48f9664..13b0f95 100644 --- a/tests/simulator.rs +++ b/tests/simulator.rs @@ -1,5 +1,5 @@ use log::debug; -use maybenot_simulator::{parse_trace, queue::SimQueue, sim, SimEvent}; +use maybenot_simulator::{network::Network, parse_trace, queue::SimQueue, sim, SimEvent}; use std::{ cmp::Reverse, @@ -64,6 +64,7 @@ fn fmt_event(e: &SimEvent, base: Instant) -> String { fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { let mut sq = SimQueue::new(); + let integration_delay = Duration::from_micros(0); // 0,s,100 18,s,200 25,r,300 25,r,300 30,s,500 35,r,600 let lines: Vec<&str> = s.split(" ").collect(); @@ -82,6 +83,7 @@ fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { }, true, timestamp, + integration_delay, Reverse(timestamp), ); } @@ -94,6 +96,7 @@ fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { }, false, sent, + integration_delay, Reverse(sent), ); } @@ -954,9 +957,9 @@ fn test_excessive_sim_delay() { const EARLY_TRACE: &str = include_str!("EARLY_TEST_TRACE.log"); // start with a reasonable 10ms delay: we should get events at the client - let delay: Duration = Duration::from_millis(10); - let pq = parse_trace(EARLY_TRACE, delay); - let trace = sim(&[], &[], &mut pq.clone(), delay, 10000, true); + let network = Network::new(Duration::from_millis(10)); + let pq = parse_trace(EARLY_TRACE, &network); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 10000, true); let client_trace = trace .clone() .into_iter() @@ -967,9 +970,9 @@ fn test_excessive_sim_delay() { // set a silly delay of 10s: this should result in zero events at the // client, because we hit the limit of events below before we get to the // first event at the client - let delay: Duration = Duration::from_millis(10000); - let pq = parse_trace(EARLY_TRACE, delay); - let trace = sim(&[], &[], &mut pq.clone(), delay, 10000, true); + let network = Network::new(Duration::from_millis(10000)); + let pq = parse_trace(EARLY_TRACE, &network); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 10000, true); let client_trace = trace .clone() .into_iter() @@ -978,7 +981,7 @@ fn test_excessive_sim_delay() { assert!(client_trace.len() == 0); // increase the limit of events to 100000: this should result in all events - let trace = sim(&[], &[], &mut pq.clone(), delay, 100000, true); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 100000, true); let client_trace = trace .clone() .into_iter()