Skip to content

Commit

Permalink
[rntimatcher]: Remove UL Median filter
Browse files Browse the repository at this point in the history
* [model] move dci_buffer to Heap
* [model] fix sending interval units
* [model] use all-r_w as fallback if rnti-based r_w cannot be calculated
  • Loading branch information
bastian-src committed Jul 6, 2024
1 parent 95713f0 commit 6cdd18a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 30 deletions.
9 changes: 6 additions & 3 deletions scripts/visualize_rnti_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,12 @@ def filter_dataset_fast(settings, raw_dataset) -> FilteredRecording:
# print("MAX_UL_BYTES_PER_DCI")
continue

if calculate_median(ue_data) <= 0:
result.skipped_median_zero += 1
continue
# HERE: Disbale UL Median filter, because the UL-Median becomes 0
# if an UE has a lot of upload.
# if calculate_median(ue_data) <= 0:
# print_debug(f"Skipping ZERO UL MEDIAN RNTI: {rnti}")
# result.skipped_median_zero += 1
# continue

for timestamp, values in ue_data['traffic'].items():
converted_timestamp = np.datetime64(int(timestamp), 'us')
Expand Down
5 changes: 3 additions & 2 deletions src/logic/cell_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ pub struct CellSourceArgs {
}

pub fn deploy_cell_source(args: CellSourceArgs) -> Result<JoinHandle<()>> {
let thread = thread::spawn(move || {
let builder = thread::Builder::new().name("[source]".to_string());
let thread = builder.spawn(move || {
let _ = run(
args.rx_app_state,
args.tx_source_state,
args.app_args,
args.tx_cell_info,
);
});
})?;
Ok(thread)
}

Expand Down
29 changes: 18 additions & 11 deletions src/logic/model_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::logic::{
};
use crate::util::determine_process_id;

pub const MAX_DCI_ARRAY_SIZE: usize = 10000;
pub const MAX_DCI_SLICE_SIZE: usize = 1000;
pub const MAX_DCI_ARRAY_SIZE: usize = 1000;
pub const MAX_DCI_SLICE_SIZE: usize = 100;
pub const MAX_DCI_SLICE_INDEX: usize = MAX_DCI_ARRAY_SIZE - MAX_DCI_SLICE_SIZE;
// Parameter gamma from [p. 456] PBE-CC: https://dl.acm.org/doi/abs/10.1145/3387514.3405880
pub const PHYSICAL_TO_TRANSPORT_OVERHEAD: f64 = 0.068;
Expand Down Expand Up @@ -52,10 +52,12 @@ impl DciRingBuffer {
self.dci_next += 1;
}

fn slice(&self, slice_size: usize) -> &[NgScopeCellDci] {
if slice_size > self.dci_next {
return self.slice(self.dci_next);
fn slice(&self, wanted_slice_size: usize) -> &[NgScopeCellDci] {
if wanted_slice_size == 0 || self.dci_next == 0 {
return &[];
}

let slice_size = usize::min(wanted_slice_size, self.dci_next);
let delta_index = self.dci_next - slice_size;
&self.dci_array[delta_index..self.dci_next]
}
Expand Down Expand Up @@ -94,10 +96,11 @@ pub fn deploy_model_handler(args: ModelHandlerArgs) -> Result<JoinHandle<()>> {
tx_metric: args.tx_metric,
};

let thread = thread::spawn(move || {
let builder = thread::Builder::new().name("[model]".to_string());
let thread = builder.spawn(move || {
let _ = run(&mut run_args);
finish(run_args);
});
})?;
Ok(thread)
}

Expand All @@ -107,12 +110,10 @@ fn send_final_state(tx_model_state: &SyncSender<ModelState>) -> Result<()> {

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_model_state: &SyncSender<ModelState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_model_state)?;
Err(anyhow!("[model] Main did not send 'Running' message"))
}
}
Expand All @@ -128,14 +129,16 @@ fn run(run_args: &mut RunArgs) -> Result<()> {
let tx_metric: &mut Bus<MessageMetric> = &mut run_args.tx_metric;

tx_model_state.send(ModelState::Running)?;
wait_for_running(rx_app_state, tx_model_state)?;
wait_for_running(rx_app_state)?;
print_info(&format!("[model]: \t\tPID {:?}", determine_process_id()));
let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US);

let model_args = FlattenedModelArgs::from_unflattened(app_args.clone().model.unwrap())?;

let mut last_metric_timestamp_us: u64 = chrono::Utc::now().timestamp_micros() as u64;
println!(" DEBUG: Before DciRingBuffer::new()");
let mut dci_buffer = DciRingBuffer::new();
println!(" DEBUG: dci_buffer.dci_array.len(): {:?}", dci_buffer.dci_array.len());
let mut last_rnti: Option<u16> = None;
let mut last_cell_info: Option<CellInfo> = None;
let last_rtt_us: Option<u64> = Some(40000); // TODO: Replace test-RTT with actual RTT and make
Expand Down Expand Up @@ -319,7 +322,7 @@ fn translate_physcial_to_transport_simple(c_physical: u64) -> u64 {

fn determine_sending_interval(model_args: &FlattenedModelArgs, last_rtt_us: &Option<u64>) -> u64 {
match model_args.model_send_metric_interval_type {
DynamicValue::FixedMs => model_args.model_send_metric_interval_value as u64,
DynamicValue::FixedMs => model_args.model_send_metric_interval_value as u64 * 1000,
DynamicValue::RttFactor => {
(last_rtt_us.unwrap() as f64 * model_args.model_send_metric_interval_value) as u64
}
Expand All @@ -332,7 +335,11 @@ fn determine_smoothing_size(model_args: &FlattenedModelArgs, last_rtt_us: &Optio
DynamicValue::RttFactor => {
(last_rtt_us.unwrap() as f64 * model_args.model_metric_smoothing_size_value / 1000.0) as u64
}
};
if unbound_slice > MAX_DCI_SLICE_SIZE as u64 {
return MAX_DCI_SLICE_SIZE as u64;
}
unbound_slice
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions src/logic/ngscope_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result<JoinHandle<()>>
let run_args_mov: RunArgsMovables = RunArgsMovables {
tx_dci: args.tx_dci,
};
let thread = thread::spawn(move || {
let builder = thread::Builder::new().name("[builder]".to_string());
let thread = builder.spawn(move || {
let _ = run(&mut run_args, run_args_mov);
finish(run_args);
});
})?;
Ok(thread)
}

Expand Down
25 changes: 13 additions & 12 deletions src/logic/rnti_matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ pub fn deploy_rnti_matcher(args: RntiMatcherArgs) -> Result<JoinHandle<()>> {
rx_metric: args.rx_metric,
};

let thread = thread::spawn(move || {
let builder = thread::Builder::new().name("[rntimatcher]".to_string());
let thread = builder.spawn(move || {
let _ = run(&mut run_args, run_args_mov);
finish(run_args);
});
})?;
Ok(thread)
}

Expand Down Expand Up @@ -733,16 +734,16 @@ impl TrafficCollection {
}
filtered
})
/* ZERO MEDIAN */
.filter(|(_, ue_traffic)| {
match ue_traffic.feature_ul_bytes_median_mean_variance() {
Ok((median, _, _)) if median > 0.0 => true,
_ => {
stats.zero_ul_median += 1;
false
}
}
})
/* ZERO MEDIAN HERE: Skip the ZERO UL MEDIAN filter */
// .filter(|(_, ue_traffic)| {
// match ue_traffic.feature_ul_bytes_median_mean_variance() {
// Ok((median, _, _)) if median > 0.0 => true,
// _ => {
// stats.zero_ul_median += 1;
// false
// }
// }
// })
.map(|(&rnti, _)| rnti)
.collect();
(cell_id, rntis_to_keep)
Expand Down
1 change: 1 addition & 0 deletions src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl Arguments {
self.devicepublisher = self.devicepublisher.or(config_file.devicepublisher);
self.ngscope = self.ngscope.or(config_file.ngscope);
self.rntimatching = self.rntimatching.or(config_file.rntimatching);
self.model = self.model.or(config_file.model);
self.verbose = self.verbose.or(config_file.verbose);

Ok(self)
Expand Down

0 comments on commit 6cdd18a

Please sign in to comment.