Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix atomics usage; ordering #5

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/output/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::clone::Clone;
use std::io;
use std::io::Write;
use std::path::Path;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use tokio::fs::File;
Expand Down Expand Up @@ -104,14 +104,13 @@ impl JsonEmitter {
let root = spec::Root {
artifact: object.clone(),
timestamp: now_tz,
seqno: self.next_sequence_no(),
seqno: self.incr_seqno(),
};
serde_json::json!(root)
}

fn next_sequence_no(&self) -> u64 {
self.seqno.fetch_add(1, atomic::Ordering::SeqCst);
self.seqno.load(atomic::Ordering::SeqCst)
fn incr_seqno(&self) -> u64 {
self.seqno.fetch_add(1, Ordering::AcqRel)
}

pub async fn emit(&self, object: &spec::RootImpl) -> Result<(), WriterError> {
Expand Down Expand Up @@ -140,7 +139,7 @@ mod tests {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand Down Expand Up @@ -168,14 +167,14 @@ mod tests {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});
let expected_2 = json!({
"schemaVersion": {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 2
"sequenceNumber": 1
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand Down
28 changes: 9 additions & 19 deletions src/output/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
// https://opensource.org/licenses/MIT.

use std::future::Future;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use serde_json::Map;
use serde_json::Value;
use tokio::sync::Mutex;

use crate::output as tv;
use crate::spec;
Expand All @@ -23,15 +22,15 @@ use tv::{dut, emitter, step};
pub struct MeasurementSeries {
emitter: Arc<step::StepEmitter>,

seq_no: Arc<Mutex<atomic::AtomicU64>>,
seq_no: Arc<atomic::AtomicU64>,
start: MeasurementSeriesStart,
}

impl MeasurementSeries {
pub(crate) fn new(series_id: &str, name: &str, emitter: Arc<step::StepEmitter>) -> Self {
Self {
emitter,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start: MeasurementSeriesStart::new(name, series_id),
}
}
Expand All @@ -42,20 +41,13 @@ impl MeasurementSeries {
) -> Self {
Self {
emitter,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start,
}
}

async fn current_sequence_no(&self) -> u64 {
self.seq_no.lock().await.load(atomic::Ordering::SeqCst)
}

async fn increment_sequence_no(&self) {
self.seq_no
.lock()
.await
.fetch_add(1, atomic::Ordering::SeqCst);
fn incr_seqno(&self) -> u64 {
self.seq_no.fetch_add(1, Ordering::AcqRel)
}

/// Starts the measurement series.
Expand Down Expand Up @@ -109,7 +101,7 @@ impl MeasurementSeries {
pub async fn end(&self) -> Result<(), emitter::WriterError> {
let end = spec::MeasurementSeriesEnd {
series_id: self.start.series_id.clone(),
total_count: self.current_sequence_no().await,
total_count: self.seq_no.load(Ordering::Acquire),
};

self.emitter
Expand Down Expand Up @@ -141,13 +133,12 @@ impl MeasurementSeries {
/// ```
pub async fn add_measurement(&self, value: Value) -> Result<(), emitter::WriterError> {
let element = spec::MeasurementSeriesElement {
index: self.current_sequence_no().await,
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
metadata: None,
};
self.increment_sequence_no().await;

self.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
Expand Down Expand Up @@ -185,15 +176,14 @@ impl MeasurementSeries {
metadata: Vec<(&str, Value)>,
) -> Result<(), emitter::WriterError> {
let element = spec::MeasurementSeriesElement {
index: self.current_sequence_no().await,
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
metadata: Some(Map::from_iter(
metadata.iter().map(|(k, v)| (k.to_string(), v.clone())),
)),
};
self.increment_sequence_no().await;

self.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
Expand Down
10 changes: 4 additions & 6 deletions src/output/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.

use serde_json::Value;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use crate::output as tv;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl TestStep {

Ok(StartedTestStep {
step: self,
measurement_id_no: Arc::new(atomic::AtomicU64::new(0)),
measurement_id_seqno: Arc::new(atomic::AtomicU64::new(0)),
})
}

Expand Down Expand Up @@ -107,7 +107,7 @@ impl TestStep {

pub struct StartedTestStep {
step: TestStep,
measurement_id_no: Arc<atomic::AtomicU64>,
measurement_id_seqno: Arc<atomic::AtomicU64>,
}

impl StartedTestStep {
Expand Down Expand Up @@ -464,11 +464,9 @@ impl StartedTestStep {
/// # });
/// ```
pub fn measurement_series(&self, name: &str) -> MeasurementSeries {
self.measurement_id_no
.fetch_add(1, atomic::Ordering::SeqCst);
let series_id: String = format!(
"series_{}",
self.measurement_id_no.load(atomic::Ordering::SeqCst)
self.measurement_id_seqno.fetch_add(1, Ordering::AcqRel)
);

MeasurementSeries::new(&series_id, name, Arc::clone(&self.step.emitter))
Expand Down
28 changes: 14 additions & 14 deletions tests/output/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -127,7 +127,7 @@ async fn test_ocptv_error_macro_with_symptom() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -146,7 +146,7 @@ async fn test_ocptv_log_debug() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -166,7 +166,7 @@ async fn test_ocptv_log_info() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -185,7 +185,7 @@ async fn test_ocptv_log_warning() -> Result<()> {
"severity": "WARNING"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -204,7 +204,7 @@ async fn test_ocptv_log_error() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -223,7 +223,7 @@ async fn test_ocptv_log_fatal() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -242,7 +242,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message_in_step() -> Result<()>
"symptom":"symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -260,7 +260,7 @@ async fn test_ocptv_error_macro_with_symptom_in_step() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -279,7 +279,7 @@ async fn test_ocptv_log_debug_in_step() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -298,7 +298,7 @@ async fn test_ocptv_log_info_in_step() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -317,7 +317,7 @@ async fn test_ocptv_log_warning_in_step() -> Result<()> {
"severity":"WARNING"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -336,7 +336,7 @@ async fn test_ocptv_log_error_in_step() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -355,7 +355,7 @@ async fn test_ocptv_log_fatal_in_step() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand Down
Loading
Loading