From 87d685a54ffa91ce8a043fab93b16a78b0322926 Mon Sep 17 00:00:00 2001 From: pulls Date: Fri, 5 Apr 2024 23:29:10 +0200 Subject: [PATCH] feat: added support for integration delays. Integration delays can happen due to how Maybenot is integrated, e.g., with Maybenot in user space and WireGuard in kernel space. There are three types of integration delays: - The *reporting* delay is the time between an event being created by the integrated protocol and the event being reported (trigger_events) to Maybenot. For example, this could be the time it takes to go from kernel space to user space. - The *trigger* delay is the time it takes for the integration to perform a scheduled action. For example, suppose an action is scheduled for time T. In that case, the trigger delay is added to T. This is important for capturing async integrations, where a zero timeout on an action to send padding would still take some (tiny) time to execute. - The *action* delay is the time between the integration taking action and the action happening. For example, if a padding packet is to be sent, user space might need to signal to kernel space to craft one. The delays are expressed as a sequence of bins (quantile binning / equal-frequency binning), where each bin has a probability of being selected. This format was motivated by extracting complex distributions from arbitrary Maybenot integrations and then creating an efficient way to sample those distributions. Efficient sampling is extra important because it's in the simulator's hot path. As part of this work, I started refactoring the network abstraction to clean up the API. --- CHANGELOG.md | 4 + Cargo.toml | 5 +- src/integration.rs | 119 +++++++++++++ src/lib.rs | 215 +++++++++++++++++------- src/network.rs | 65 ++++++-- src/queue.rs | 7 +- tests/example.rs | 12 +- tests/integration_delay.rs | 333 +++++++++++++++++++++++++++++++++++++ tests/simulator.rs | 19 ++- 9 files changed, 690 insertions(+), 89 deletions(-) create mode 100644 src/integration.rs create mode 100644 tests/integration_delay.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d4c952..60f7b21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ Manually generated changelog, for now. We follow semantic versioning. +## 1.1.0 - 2024-04-05 +- Support for integration delays. +- Light networking refactor. + ## 1.0.1 - 2023-11-24 - Minor README update. diff --git a/Cargo.toml b/Cargo.toml index 3bed8b7..0a06227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "maybenot-simulator" -version = "1.0.1" +version = "1.1.0" edition = "2021" authors = ["Tobias Pulls"] license = "MIT OR Apache-2.0" @@ -18,6 +18,9 @@ maybenot = "1.0.0" log = "0.4.20" test-log = "0.2.12" fastrand = "2.0.0" +serde = "1.0.193" +rand = "0.8.5" +serde_json = "1.0.108" [dev-dependencies] env_logger = "0.10.1" diff --git a/src/integration.rs b/src/integration.rs new file mode 100644 index 0000000..0c76fbd --- /dev/null +++ b/src/integration.rs @@ -0,0 +1,119 @@ +use rand::Rng; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, error::Error, time::Duration}; + +#[derive(Serialize, Deserialize, Debug)] +struct Bin { + range: (f64, f64), + probability: f64, +} + +/// Represents a Maybenot integration and its associated delays. This can happen +/// in the case of Maybenot being integrated, e.g., in user space with a +/// protocol running in kernel space. +#[derive(Clone, Debug)] +pub struct Integration { + /// The *action* delay is the time between the integration taking action and + /// the action happening. For example, if a padding packet is to be sent, + /// user space might need to signal to kernel space to craft one. NOTE: we + /// assume that the PaddingSent event is triggered directly as padding is + /// sent from Maybenot, while we assume that the BlockingBegin event is + /// triggered when the blocking actually begins in the protocol and the + /// event is transported with a reporting delay. + pub action_delay: BinDist, + /// The *reporting* delay is the time between an event being created by the + /// integrated protocol and the event being reported (trigger_events) to + /// Maybenot. For example, this could be the time it takes to go from kernel + /// space to user space. + pub reporting_delay: BinDist, + /// The *trigger* delay is the time it takes for the integration to perform + /// a scheduled action. For example, suppose an action is scheduled for time + /// T. In that case, the trigger delay is added to T. This is important for + /// capturing async integrations, where a zero timeout on an action to send + /// padding would still take some (tiny) time to execute. + pub trigger_delay: BinDist, +} + +impl Integration { + pub fn action_delay(&self) -> Duration { + self.action_delay.sample() + } + + pub fn reporting_delay(&self) -> Duration { + self.reporting_delay.sample() + } + + pub fn trigger_delay(&self) -> Duration { + self.trigger_delay.sample() + } +} + +/// A distribution of values in bins with a probability for each bin. Used to +/// estimate delay distributions in a Maybenot integration. +#[derive(Clone, Debug)] +pub struct BinDist { + bins: Vec<(f64, f64)>, // Vec of (min, max) tuples for each bin + cumulative_probabilities: Vec, // Cumulative probabilities for efficient sampling +} + +impl BinDist { + pub fn new(json_input: &str) -> Result> { + let bins: HashMap = serde_json::from_str(json_input)?; + + let mut sorted_bins: Vec<_> = bins + .into_iter() + .map(|(range, prob)| { + // Manually parsing the range tuple + let range_values: Vec = range + .trim_matches(|c: char| c == '(' || c == ')') + .split(',') + .map(str::trim) + .map(str::parse) + .collect::, _>>()?; + + if range_values.len() != 2 { + return Err("Range must have exactly two values".into()); + } + + Ok(((range_values[0], range_values[1]), prob)) + }) + .collect::, Box>>()?; + + // Sort bins by range start for cumulative probability calculation + sorted_bins.sort_by(|a, b| a.0 .0.partial_cmp(&b.0 .0).unwrap()); + + let mut cumulative_probabilities = Vec::with_capacity(sorted_bins.len()); + let mut total_prob = 0.0; + let mut ranges = Vec::with_capacity(sorted_bins.len()); + + for (range, prob) in sorted_bins { + total_prob += prob; + cumulative_probabilities.push(total_prob); + ranges.push(range); + } + + Ok(BinDist { + bins: ranges, + cumulative_probabilities, + }) + } + + pub fn sample(&self) -> Duration { + let mut rng = rand::thread_rng(); + let sample_prob = rng.gen::(); + let bin_index = match self + .cumulative_probabilities + .binary_search_by(|prob| prob.partial_cmp(&sample_prob).unwrap()) + { + Ok(index) => index, + Err(index) => index, + }; + + let (min, max) = self.bins[bin_index]; + // bins are in milliseconds, to get microseconds we multiply by 1000 + if min == max { + return Duration::from_micros((min * 1000.0) as u64); + } + Duration::from_micros((rng.gen_range(min..max) * 1000.0) as u64) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5877708..251a49f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ //! ## Example usage //! ``` //! use maybenot::{framework::TriggerEvent, machine::Machine}; -//! use maybenot_simulator::{parse_trace, sim}; +//! use maybenot_simulator::{parse_trace, network::Network, sim}; //! use std::{str::FromStr, time::Duration}; //! //! // A trace of ten packets from the client's perspective when visiting @@ -33,15 +33,15 @@ //! 9401094589,s,73 //! 9420892765,r,191"; //! -//! // The delay between client and server. This is for the simulation of the -//! // network between the client and server -//! let delay = Duration::from_millis(10); +//! // The network model for simulating the network between the client and the +//! // server. Currently just a delay. +//! let network = Network::new(Duration::from_millis(10)); //! //! // Parse the raw trace into a queue of events for the simulator. This uses //! // the delay to generate a queue of events at the client and server in such //! // a way that the client is ensured to get the packets in the same order and //! // at the same time as in the raw trace. -//! let mut input_trace = parse_trace(raw_trace, delay); +//! let mut input_trace = parse_trace(raw_trace, &network); //! //! // A simple machine that sends one padding packet of 1000 bytes 20 //! // milliseconds after the first NonPaddingSent is sent. @@ -51,7 +51,7 @@ //! //! // Run the simulator with the machine at the client. Run the simulation up //! // until 100 packets have been recorded (total, client and server). -//! let trace = sim(&[m], &[], &mut input_trace, delay, 100, true); +//! let trace = sim(&[m], &[], &mut input_trace, network.delay, 100, true); //! //! // print packets from the client's perspective //! let starting_time = trace[0].time; @@ -106,6 +106,7 @@ //! received 191 bytes at 9420 ms //! ``` +pub mod integration; pub mod network; pub mod peek; pub mod queue; @@ -116,7 +117,9 @@ use std::{ time::{Duration, Instant}, }; +use integration::Integration; use log::debug; +use network::Network; use queue::SimQueue; use maybenot::{ @@ -136,6 +139,7 @@ use crate::{ pub struct SimEvent { pub event: TriggerEvent, pub time: Instant, + pub delay: Duration, pub client: bool, // internal flag to mark event as bypass bypass: bool, @@ -167,6 +171,8 @@ pub struct SimState { last_sent_time: Instant, /// size of the last sent packet last_sent_size: u16, + /// integration aspects for this state + integration: Option, } impl SimState @@ -179,6 +185,7 @@ where max_padding_frac: f64, max_blocking_frac: f64, mtu: u16, + integration: Option, ) -> Self { Self { framework: Framework::new( @@ -198,8 +205,30 @@ where .checked_sub(Duration::from_millis(1000)) .unwrap(), last_sent_size: 0, + integration, } } + + pub fn reporting_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } + + pub fn action_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.action_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } + + pub fn trigger_delay(&self) -> Duration { + self.integration + .as_ref() + .map(|i| i.trigger_delay.sample()) + .unwrap_or(Duration::from_micros(0)) + } } /// The main simulator function. @@ -231,14 +260,15 @@ pub fn sim( max_trace_length: usize, only_network_activity: bool, ) -> Vec { - let args = SimulatorArgs::new(delay, max_trace_length, only_network_activity); + let network = Network::new(delay); + let args = SimulatorArgs::new(&network, max_trace_length, only_network_activity); sim_advanced(machines_client, machines_server, sq, &args) } /// Arguments for [`sim_advanced`]. #[derive(Clone, Debug)] -pub struct SimulatorArgs { - pub delay: Duration, +pub struct SimulatorArgs<'a> { + pub network: &'a Network, pub max_trace_length: usize, pub max_sim_iterations: usize, pub only_client_events: bool, @@ -248,12 +278,14 @@ pub struct SimulatorArgs { pub max_padding_frac_server: f64, pub max_blocking_frac_server: f64, pub mtu: u16, + pub client_integration: Option<&'a Integration>, + pub server_integration: Option<&'a Integration>, } -impl SimulatorArgs { - pub fn new(delay: Duration, max_trace_length: usize, only_network_activity: bool) -> Self { +impl<'a> SimulatorArgs<'a> { + pub fn new(network: &'a Network, max_trace_length: usize, only_network_activity: bool) -> Self { Self { - delay, + network, max_trace_length, max_sim_iterations: 0, only_client_events: false, @@ -264,6 +296,8 @@ impl SimulatorArgs { max_blocking_frac_server: 0.0, // WireGuard default MTU mtu: 1420, + client_integration: None, + server_integration: None, } } } @@ -290,6 +324,7 @@ pub fn sim_advanced( args.max_padding_frac_client, args.max_blocking_frac_client, args.mtu, + args.client_integration.cloned(), ); let mut server = SimState::new( machines_server, @@ -297,6 +332,7 @@ pub fn sim_advanced( args.max_padding_frac_server, args.max_blocking_frac_server, args.mtu, + args.server_integration.cloned(), ); let mut sim_iterations = 0; @@ -341,9 +377,9 @@ pub fn sim_advanced( // where the simulator simulates the entire network between the client // and the server. TODO: make delay/network more realistic. let network_activity = if next.client { - sim_network_activity(&next, sq, &client, current_time, args.delay) + sim_network_activity(&next, sq, &client, &server, args.network, ¤t_time) } else { - sim_network_activity(&next, sq, &server, current_time, args.delay) + sim_network_activity(&next, sq, &server, &client, args.network, ¤t_time) }; if network_activity { @@ -366,20 +402,10 @@ pub fn sim_advanced( // get actions, update scheduled actions if next.client { debug!("sim(): trigger @client framework\n{:#?}", next.event); - trigger_update( - &mut client.framework, - &mut client.scheduled_action, - &next, - ¤t_time, - ); + trigger_update(&mut client, &next, ¤t_time); } else { debug!("sim(): trigger @server framework\n{:#?}", next.event); - trigger_update( - &mut server.framework, - &mut server.scheduled_action, - &next, - ¤t_time, - ); + trigger_update(&mut server, &next, ¤t_time); } // conditional save to resulting trace: only on network activity if set @@ -387,7 +413,24 @@ pub fn sim_advanced( if (!args.only_network_activity || network_activity) && (!args.only_client_events || next.client) { - trace.push(next); + // this should be a network trace: adjust timestamps based on any + // integration delays + let mut n = next.clone(); + match next.event { + TriggerEvent::PaddingSent { .. } => { + // padding adds the action delay + n.time += n.delay; + } + TriggerEvent::PaddingRecv { .. } + | TriggerEvent::NonPaddingRecv { .. } + | TriggerEvent::NonPaddingSent { .. } => { + // reported events remove the reporting delay + n.time -= n.delay; + } + _ => {} + } + + trace.push(n); } if args.max_trace_length > 0 && trace.len() >= args.max_trace_length { @@ -411,6 +454,10 @@ pub fn sim_advanced( debug!("sim(): main loop end, more work?"); debug!("#########################################################"); } + + // sort the trace by time + trace.sort_by(|a, b| a.time.cmp(&b.time)); + trace } @@ -460,6 +507,8 @@ fn pick_next>( // create SimEvent and move blocking into (what soon will be) the past // to indicate that it has been processed let time: Instant; + // ASSUMPTION: block outgoing is reported from integration + let delay: Duration; let client_earliest = if client.blocking_until >= current_time && server.blocking_until >= current_time { client.blocking_until <= server.blocking_until @@ -468,10 +517,12 @@ fn pick_next>( }; if client_earliest { - time = client.blocking_until; + delay = client.reporting_delay(); + time = client.blocking_until + delay; client.blocking_until -= Duration::from_micros(1); } else { - time = server.blocking_until; + delay = server.reporting_delay(); + time = server.blocking_until + delay; server.blocking_until -= Duration::from_micros(1); } @@ -479,6 +530,7 @@ fn pick_next>( client: client_earliest, event: TriggerEvent::BlockingEnd, time, + delay, fuzz: fastrand::i32(..), bypass: false, replace: false, @@ -491,7 +543,7 @@ fn pick_next>( let target = current_time + s; let act = do_scheduled(client, server, current_time, target); if let Some(a) = act { - sq.push_sim(a, Reverse(current_time)); + sq.push_sim(a.clone(), Reverse(a.time)); } pick_next(sq, client, server, current_time) } @@ -548,17 +600,26 @@ fn do_scheduled>( bypass, replace, machine, - } => Some(SimEvent { - event: TriggerEvent::PaddingSent { - bytes_sent: size, - machine, - }, - time: a.time, - client: a_is_client, - bypass, - replace, - fuzz: fastrand::i32(..), - }), + } => { + let action_delay = if a_is_client { + client.action_delay() + } else { + server.action_delay() + }; + + Some(SimEvent { + event: TriggerEvent::PaddingSent { + bytes_sent: size, + machine, + }, + time: a.time, + delay: action_delay, + client: a_is_client, + bypass, + replace, + fuzz: fastrand::i32(..), + }) + } Action::BlockOutgoing { timeout: _, duration, @@ -568,6 +629,13 @@ fn do_scheduled>( } => { let block = a.time + duration; let event_bypass; + // ASSUMPTION: block outgoing reported from integration + let total_delay = if a_is_client { + client.action_delay() + client.reporting_delay() + } else { + server.action_delay() + server.reporting_delay() + }; + let reported = a.time + total_delay; // should we update client/server blocking? if a_is_client { @@ -587,7 +655,8 @@ fn do_scheduled>( // event triggered regardless Some(SimEvent { event: TriggerEvent::BlockingBegin { machine }, - time: a.time, + time: reported, + delay: total_delay, client: a_is_client, bypass: event_bypass, replace: false, @@ -598,20 +667,24 @@ fn do_scheduled>( } fn trigger_update>( - f: &mut Framework, - actions: &mut HashMap, + state: &mut SimState, next: &SimEvent, current_time: &Instant, ) { + let trigger_delay = state.trigger_delay(); + // parse actions and update - for action in f.trigger_events(&[next.event.clone()], *current_time) { + for action in state + .framework + .trigger_events(&[next.event.clone()], *current_time) + { match action { Action::Cancel { machine } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time, + time: *current_time + trigger_delay, }, ); } @@ -622,11 +695,11 @@ fn trigger_update>( replace: _, machine, } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time + *timeout, + time: *current_time + *timeout + trigger_delay, }, ); } @@ -637,11 +710,11 @@ fn trigger_update>( replace: _, machine, } => { - actions.insert( + state.scheduled_action.insert( *machine, ScheduledAction { action: Some(action.clone()), - time: *current_time + *timeout, + time: *current_time + *timeout + trigger_delay, }, ); } @@ -657,7 +730,17 @@ fn trigger_update>( /// number of bytes sent or received. The delay is used to model the network /// delay between the client and server. Returns a SimQueue with the events in /// the trace for use with [`sim`]. -pub fn parse_trace(trace: &str, delay: Duration) -> SimQueue { + +pub fn parse_trace(trace: &str, network: &Network) -> SimQueue { + parse_trace_advanced(trace, network, None, None) +} + +pub fn parse_trace_advanced( + trace: &str, + network: &Network, + client: Option<&Integration>, + server: Option<&Integration>, +) -> SimQueue { let mut sq = SimQueue::new(); // we just need a random starting time to make sure that we don't start from @@ -672,29 +755,43 @@ pub fn parse_trace(trace: &str, delay: Duration) -> SimQueue { let size = parts[2].trim().parse::().unwrap(); match parts[1] { - "s" => { + "s" | "sn" => { // client sent at the given time + let reporting_delay = client + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)); + let reported = timestamp + reporting_delay; sq.push( TriggerEvent::NonPaddingSent { bytes_sent: size as u16, }, true, - timestamp, - Reverse(timestamp), + reported, + reporting_delay, + Reverse(reported), ); } - "r" => { + "r" | "rn" => { // sent by server delay time ago - let sent = timestamp.checked_sub(delay).unwrap(); + let sent = timestamp.checked_sub(network.delay).unwrap(); + // but reported to the Maybenot framework at the server with delay + let reporting_delay = server + .map(|i| i.reporting_delay.sample()) + .unwrap_or(Duration::from_micros(0)); + let reported = sent + reporting_delay; sq.push( TriggerEvent::NonPaddingSent { bytes_sent: size as u16, }, false, - sent, - Reverse(sent), + reported, + reporting_delay, + Reverse(reported), ); } + "sp" | "rp" => { + // TODO: figure out of ignoring is the right thing to do + } _ => { panic!("invalid direction") } diff --git a/src/network.rs b/src/network.rs index 65fb673..eac6038 100644 --- a/src/network.rs +++ b/src/network.rs @@ -2,7 +2,7 @@ //! client and server. use std::{ - cmp::Reverse, + cmp::{max, Reverse}, time::{Duration, Instant}, }; @@ -11,6 +11,23 @@ use maybenot::{event::Event, framework::TriggerEvent, machine::Machine}; use crate::{queue::SimQueue, SimEvent, SimState}; +/// A model of the network between the client and server. TODO: make this more +/// than just a delay. +#[derive(Debug, Clone)] +pub struct Network { + pub delay: Duration, +} + +impl Network { + pub fn new(delay: Duration) -> Self { + Self { delay } + } + + pub fn sample(&self) -> Duration { + self.delay + } +} + /// The network replace window is the time window in which we can replace /// padding with existing padding or non-padding already queued (or about to be /// queued up). The behavior here is tricky, since it'll differ how different @@ -26,8 +43,9 @@ pub fn sim_network_activity>( next: &SimEvent, sq: &mut SimQueue, state: &SimState, - current_time: Instant, - delay: Duration, + recipient: &SimState, + network: &Network, + current_time: &Instant, ) -> bool { let side = if next.client { "client" } else { "server" }.to_string(); @@ -35,15 +53,29 @@ pub fn sim_network_activity>( // easy: queue up the recv event on the other side TriggerEvent::NonPaddingSent { bytes_sent } => { debug!("\tqueue {}", Event::NonPaddingRecv); - // TODO: make the network more than a delay! - let time = current_time + delay; + // The time the event was reported to us is in next.time. We have to + // remove the reporting delay locally, then add a network delay and + // a reporting delay (at the recipient) for the recipient. + // + // LIMITATION, we also have to deal with an ugly edge-case: if the + // reporting delay is very long *at the sender*, then the event can + // actually arrive earlier at the recipient than it was reported to + // the sender. This we cannot deal with in the current design of the + // simulator (support for integration delays was bolted on late), + // because it would move time backwards. Therefore, we clamp. + let reporting_delay = recipient.reporting_delay(); + let reported = max( + next.time - next.delay + network.sample() + reporting_delay, + *current_time, + ); sq.push( TriggerEvent::NonPaddingRecv { bytes_recv: bytes_sent, }, !next.client, - time, - Reverse(time), + reported, + reporting_delay, + Reverse(reported), ); true @@ -76,7 +108,9 @@ pub fn sim_network_activity>( } // can replace with nonpadding that's queued to be sent within - // the network replace window? + // the network replace window? FIXME: here be bugs related to + // integration delays. Once blocking is implemented, this code + // needs to be reworked. let peek = sq.peek_blocking(state.blocking_bypassable, next.client); if let Some((queued, _)) = peek { let queued = queued.clone(); @@ -95,8 +129,8 @@ pub fn sim_network_activity>( if queued_bytes_sent <= bytes_sent { debug!("replacing padding sent with queued non-padding @{}", side,); // let the NonPaddingSent event bypass - // blocking by making a copy of the eevent - // with the approproiate flags set + // blocking by making a copy of the event + // with the appropriate flags set let mut tmp = queued.clone(); tmp.bypass = true; tmp.replace = false; @@ -116,19 +150,22 @@ pub fn sim_network_activity>( // nothing to replace with (or we're not replacing), so queue up debug!("\tqueue {}", Event::PaddingRecv); - let time = current_time + delay; + let reporting_delay = recipient.reporting_delay(); + // action delay + network + recipient reporting delay + let reported = next.time + next.delay + network.sample() + reporting_delay; sq.push( TriggerEvent::PaddingRecv { bytes_recv: bytes_sent, }, !next.client, - time, - Reverse(time), + reported, + reporting_delay, + Reverse(reported), ); true } - // receiving (non-)padding is reciving a packet + // receiving (non-)padding is receiving a packet TriggerEvent::NonPaddingRecv { .. } | TriggerEvent::PaddingRecv { .. } => true, _ => false, } diff --git a/src/queue.rs b/src/queue.rs index 7774bbc..3ba855a 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,6 +1,9 @@ //! The main queue of events in the simulator. -use std::{cmp::Reverse, time::Instant}; +use std::{ + cmp::Reverse, + time::{Duration, Instant}, +}; use maybenot::framework::TriggerEvent; use priority_queue::PriorityQueue; @@ -45,12 +48,14 @@ impl SimQueue { event: TriggerEvent, is_client: bool, time: Instant, + delay: Duration, priority: Reverse, ) { self.push_sim( SimEvent { event, time, + delay, client: is_client, bypass: false, replace: false, diff --git a/tests/example.rs b/tests/example.rs index e11f9a9..b020cad 100644 --- a/tests/example.rs +++ b/tests/example.rs @@ -1,5 +1,5 @@ use maybenot::{framework::TriggerEvent, machine::Machine}; -use maybenot_simulator::{parse_trace, sim}; +use maybenot_simulator::{network::Network, parse_trace, sim}; use std::{str::FromStr, time::Duration}; #[test_log::test] @@ -19,15 +19,15 @@ fn simulator_example_use() { 9401094589,s,73 9420892765,r,191"; - // The delay between client and server. This is for the simulation of the - // network between the client and server - let delay = Duration::from_millis(10); + // The network model for simulating the network between the client and the + // server. Currently just a delay. + let network = Network::new(Duration::from_millis(10)); // Parse the raw trace into a queue of events for the simulator. This uses // the delay to generate a queue of events at the client and server in such // a way that the client is ensured to get the packets in the same order and // at the same time as in the raw trace. - let mut input_trace = parse_trace(raw_trace, delay); + let mut input_trace = parse_trace(raw_trace, &network); // A simple machine that sends one padding packet of 1000 bytes 20 // milliseconds after the first NonPaddingSent is sent. @@ -37,7 +37,7 @@ fn simulator_example_use() { // Run the simulator with the machine at the client. Run the simulation up // until 100 packets have been recorded (total, client and server). - let trace = sim(&[m], &[], &mut input_trace, delay, 100, true); + let trace = sim(&[m], &[], &mut input_trace, network.delay, 100, true); // print packets from the client's perspective let starting_time = trace[0].time; diff --git a/tests/integration_delay.rs b/tests/integration_delay.rs new file mode 100644 index 0000000..87e0eee --- /dev/null +++ b/tests/integration_delay.rs @@ -0,0 +1,333 @@ +use std::{collections::HashMap, time::Duration}; + +use maybenot::{ + dist::{Dist, DistType}, + event::Event, + machine::Machine, + state::State, +}; +use maybenot_simulator::{ + integration::{BinDist, Integration}, + network::Network, + parse_trace_advanced, sim_advanced, SimEvent, SimulatorArgs, +}; + +fn get_test_machine() -> Machine { + // a simple machine that pads once after 5ms + let num_states = 2; + let mut t: HashMap> = HashMap::new(); + let mut e: HashMap = HashMap::new(); + e.insert(1, 1.0); + t.insert(Event::NonPaddingSent, e); + let s0 = State::new(t, num_states); + let t: HashMap> = HashMap::new(); + let mut s1 = State::new(t, num_states); + s1.timeout = Dist { + dist: DistType::Uniform, + param1: 0.0, + param2: 0.0, + start: 5.0 * 1000.0, + max: 0.0, + }; + Machine { + allowed_padding_bytes: 10000, + max_padding_frac: 1.0, + allowed_blocked_microsec: 0, + max_blocking_frac: 0.0, + states: vec![s0, s1], + include_small_packets: true, + } +} + +fn run_sim( + client: Option<&Integration>, + server: Option<&Integration>, + only_client: bool, +) -> Vec { + // a simple machine that pads once after 5ms + let m = get_test_machine(); + + let raw_trace = "0,s,100 + 10000000,r,100 + 20000000,s,100 + 32000000,r,100 + 56000000,s,100 + 100000000,s,100"; + let network = Network::new(Duration::from_millis(5)); + + let mut input_trace = parse_trace_advanced(raw_trace, &network, client, server); + + let mut args = SimulatorArgs::new(&network, 100, true); + args.client_integration = client; + args.server_integration = server; + let trace = sim_advanced(&[m], &[], &mut input_trace, &args); + + let trace: Vec<_> = trace + .into_iter() + .filter(|e| e.client == only_client) + .collect(); + println!("trace: {:?}", trace); + trace +} + +fn get_1ms_delay_dist() -> BinDist { + BinDist::new( + r#"{ + "(1.0, 1.0)": 1.0 + }"#, + ) + .unwrap() +} + +fn get_0ms_delay_dist() -> BinDist { + BinDist::new( + r#"{ + "(0.0, 0.0)": 1.0 + }"#, + ) + .unwrap() +} + +#[test_log::test] +fn test_action_delay() { + // action delay should be visible in the network trace we get from the + // simulator, by simply delaying padding packets by the action delay or + // delaying blocking to start/stop by the action delay + + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_0ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(0)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.action_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_reporting_delay() { + // reporting delay should be indirectly visible in the network trace we get + // from the simulator, because events reported by the simulator will have a + // delay, resulting actions will be delayed, and the resulting padding + // packets will therefore be delayed in the network trace + + let integration = Integration { + action_delay: get_0ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(0)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.reporting_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.reporting_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_trigger_delay() { + // trigger delay should be visible in the network trace we get from the + // simulator, by simply delaying padding packets by the trigger delay + + let integration = Integration { + action_delay: get_0ms_delay_dist(), + reporting_delay: get_0ms_delay_dist(), + trigger_delay: get_1ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(0)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(0)); + assert_eq!(integration.trigger_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.trigger_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + assert!(delayed_trace_server[2].event.is_event(Event::PaddingRecv)); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.trigger_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_action_and_reporting_delay() { + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_0ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + integration.reporting_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + integration.reporting_delay() + integration.action_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} + +#[test_log::test] +fn test_action_reporting_and_delay() { + let integration = Integration { + action_delay: get_1ms_delay_dist(), + reporting_delay: get_1ms_delay_dist(), + trigger_delay: get_1ms_delay_dist(), + }; + assert_eq!(integration.action_delay(), Duration::from_micros(1000)); + assert_eq!(integration.reporting_delay(), Duration::from_micros(1000)); + assert_eq!(integration.trigger_delay(), Duration::from_micros(1000)); + + // for client + let base_trace = run_sim(None, None, true); + let delayed_trace = run_sim(Some(&integration), None, true); + + assert_eq!(base_trace.len(), delayed_trace.len()); + assert_eq!(base_trace[1].event, delayed_trace[1].event); + assert!(base_trace[1].event.is_event(Event::PaddingSent)); + assert_eq!( + (delayed_trace[1].time - delayed_trace[0].time) - (base_trace[1].time - base_trace[0].time), + integration.action_delay() + integration.reporting_delay() + integration.trigger_delay() + ); + + let delayed_trace_server = run_sim(Some(&integration), None, false); + assert_eq!(base_trace.len(), delayed_trace_server.len()); + // note below that first recv is 5ms in + assert_eq!( + delayed_trace_server[2].time - delayed_trace_server[0].time + Duration::from_millis(5), + Duration::from_millis(5) * 2 + + integration.reporting_delay() + + integration.action_delay() + + integration.trigger_delay() + ); + + // for server, everything should be the same (no action there due to machine + // being client-side) + let base_trace = run_sim(None, None, false); + let delayed_trace = run_sim(None, Some(&integration), false); + assert_eq!(base_trace.len(), delayed_trace.len()); + + for i in 0..base_trace.len() { + assert_eq!(base_trace[i].event, delayed_trace[i].event); + assert_eq!( + base_trace[i].time - base_trace[0].time, + delayed_trace[i].time - delayed_trace[0].time + ); + } +} diff --git a/tests/simulator.rs b/tests/simulator.rs index 48f9664..13b0f95 100644 --- a/tests/simulator.rs +++ b/tests/simulator.rs @@ -1,5 +1,5 @@ use log::debug; -use maybenot_simulator::{parse_trace, queue::SimQueue, sim, SimEvent}; +use maybenot_simulator::{network::Network, parse_trace, queue::SimQueue, sim, SimEvent}; use std::{ cmp::Reverse, @@ -64,6 +64,7 @@ fn fmt_event(e: &SimEvent, base: Instant) -> String { fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { let mut sq = SimQueue::new(); + let integration_delay = Duration::from_micros(0); // 0,s,100 18,s,200 25,r,300 25,r,300 30,s,500 35,r,600 let lines: Vec<&str> = s.split(" ").collect(); @@ -82,6 +83,7 @@ fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { }, true, timestamp, + integration_delay, Reverse(timestamp), ); } @@ -94,6 +96,7 @@ fn make_sq(s: String, delay: Duration, starting_time: Instant) -> SimQueue { }, false, sent, + integration_delay, Reverse(sent), ); } @@ -954,9 +957,9 @@ fn test_excessive_sim_delay() { const EARLY_TRACE: &str = include_str!("EARLY_TEST_TRACE.log"); // start with a reasonable 10ms delay: we should get events at the client - let delay: Duration = Duration::from_millis(10); - let pq = parse_trace(EARLY_TRACE, delay); - let trace = sim(&[], &[], &mut pq.clone(), delay, 10000, true); + let network = Network::new(Duration::from_millis(10)); + let pq = parse_trace(EARLY_TRACE, &network); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 10000, true); let client_trace = trace .clone() .into_iter() @@ -967,9 +970,9 @@ fn test_excessive_sim_delay() { // set a silly delay of 10s: this should result in zero events at the // client, because we hit the limit of events below before we get to the // first event at the client - let delay: Duration = Duration::from_millis(10000); - let pq = parse_trace(EARLY_TRACE, delay); - let trace = sim(&[], &[], &mut pq.clone(), delay, 10000, true); + let network = Network::new(Duration::from_millis(10000)); + let pq = parse_trace(EARLY_TRACE, &network); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 10000, true); let client_trace = trace .clone() .into_iter() @@ -978,7 +981,7 @@ fn test_excessive_sim_delay() { assert!(client_trace.len() == 0); // increase the limit of events to 100000: this should result in all events - let trace = sim(&[], &[], &mut pq.clone(), delay, 100000, true); + let trace = sim(&[], &[], &mut pq.clone(), network.delay, 100000, true); let client_trace = trace .clone() .into_iter()