Skip to content

Commit

Permalink
fix atomics usage; ordering
Browse files Browse the repository at this point in the history
- unnecessarily strict ordering for simple atomic counters; SeqCst
- fix seqno in artifacts, spec says it starts at 0
- simplify some of the counting methods

Signed-off-by: mimir-d <[email protected]>
  • Loading branch information
mimir-d committed Oct 4, 2024
1 parent c800560 commit fcfceb7
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 219 deletions.
15 changes: 7 additions & 8 deletions src/output/emitters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::clone::Clone;
use std::io;
use std::io::Write;
use std::path::Path;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use tokio::fs::File;
Expand Down Expand Up @@ -104,14 +104,13 @@ impl JsonEmitter {
let out_artifact = models::OutputArtifactSpec {
descendant: object.clone(),
now: now_tz,
sequence_number: self.next_sequence_no(),
sequence_number: self.incr_seqno(),
};
serde_json::json!(out_artifact)
}

fn next_sequence_no(&self) -> u64 {
self.sequence_no.fetch_add(1, atomic::Ordering::SeqCst);
self.sequence_no.load(atomic::Ordering::SeqCst)
fn incr_seqno(&self) -> u64 {
self.sequence_no.fetch_add(1, Ordering::AcqRel)
}

pub async fn emit(&self, object: &models::OutputArtifactDescendant) -> Result<(), WriterError> {
Expand Down Expand Up @@ -142,7 +141,7 @@ mod tests {
"major": models::SPEC_VERSION.0,
"minor": models::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand All @@ -167,14 +166,14 @@ mod tests {
"major": models::SPEC_VERSION.0,
"minor": models::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});
let expected_2 = json!({
"schemaVersion": {
"major": models::SPEC_VERSION.0,
"minor": models::SPEC_VERSION.1,
},
"sequenceNumber": 2
"sequenceNumber": 1
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand Down
42 changes: 18 additions & 24 deletions src/output/measurement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.

use std::future::Future;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use chrono::DateTime;
Expand All @@ -22,15 +22,15 @@ use tv::{dut, emitters, models, state};
/// ref: https://github.com/opencomputeproject/ocp-diag-core/tree/main/json_spec#measurementseriesstart
pub struct MeasurementSeries {
state: Arc<Mutex<state::TestState>>,
seq_no: Arc<Mutex<atomic::AtomicU64>>,
seq_no: Arc<atomic::AtomicU64>,
start: MeasurementSeriesStart,
}

impl MeasurementSeries {
pub(crate) fn new(series_id: &str, name: &str, state: Arc<Mutex<state::TestState>>) -> Self {
Self {
state,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start: MeasurementSeriesStart::new(name, series_id),
}
}
Expand All @@ -41,20 +41,13 @@ impl MeasurementSeries {
) -> Self {
Self {
state,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start,
}
}

async fn current_sequence_no(&self) -> u64 {
self.seq_no.lock().await.load(atomic::Ordering::SeqCst)
}

async fn increment_sequence_no(&self) {
self.seq_no
.lock()
.await
.fetch_add(1, atomic::Ordering::SeqCst);
fn incr_seqno(&self) -> u64 {
self.seq_no.fetch_add(1, Ordering::AcqRel)
}

/// Starts the measurement series.
Expand Down Expand Up @@ -107,14 +100,18 @@ impl MeasurementSeries {
/// # });
/// ```
pub async fn end(&self) -> Result<(), emitters::WriterError> {
let end =
MeasurementSeriesEnd::new(self.start.get_series_id(), self.current_sequence_no().await);
let end = MeasurementSeriesEnd::new(
self.start.get_series_id(),
self.seq_no.load(Ordering::Acquire),
);

self.state
.lock()
.await
.emitter
.emit(&end.to_artifact())
.await?;

Ok(())
}

Expand All @@ -139,19 +136,15 @@ impl MeasurementSeries {
/// # });
/// ```
pub async fn add_measurement(&self, value: Value) -> Result<(), emitters::WriterError> {
let element = MeasurementSeriesElement::new(
self.current_sequence_no().await,
value,
&self.start,
None,
);
self.increment_sequence_no().await;
let element = MeasurementSeriesElement::new(self.incr_seqno(), value, &self.start, None);

self.state
.lock()
.await
.emitter
.emit(&element.to_artifact())
.await?;

Ok(())
}

Expand Down Expand Up @@ -182,20 +175,21 @@ impl MeasurementSeries {
metadata: Vec<(&str, Value)>,
) -> Result<(), emitters::WriterError> {
let element = MeasurementSeriesElement::new(
self.current_sequence_no().await,
self.incr_seqno(),
value,
&self.start,
Some(Map::from_iter(
metadata.iter().map(|(k, v)| (k.to_string(), v.clone())),
)),
);
self.increment_sequence_no().await;

self.state
.lock()
.await
.emitter
.emit(&element.to_artifact())
.await?;

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/output/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl TestRun {
/// # });
/// ```
pub async fn start(self) -> Result<StartedTestRun, emitters::WriterError> {
// TODO: test run should not be responsible to emit the schema version.
// That should sit with the emitter (and/or some context which only runs once)
let version = SchemaVersion::new();
self.state
.lock()
Expand Down
10 changes: 4 additions & 6 deletions src/output/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.

use serde_json::Value;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -56,7 +56,7 @@ impl TestStep {

Ok(StartedTestStep {
step: self,
measurement_id_no: Arc::new(atomic::AtomicU64::new(0)),
measurement_id_seqno: Arc::new(atomic::AtomicU64::new(0)),
})
}

Expand Down Expand Up @@ -101,7 +101,7 @@ impl TestStep {

pub struct StartedTestStep {
step: TestStep,
measurement_id_no: Arc<atomic::AtomicU64>,
measurement_id_seqno: Arc<atomic::AtomicU64>,
}

impl StartedTestStep {
Expand Down Expand Up @@ -489,11 +489,9 @@ impl StartedTestStep {
/// # });
/// ```
pub fn measurement_series(&self, name: &str) -> MeasurementSeries {
self.measurement_id_no
.fetch_add(1, atomic::Ordering::SeqCst);
let series_id: String = format!(
"series_{}",
self.measurement_id_no.load(atomic::Ordering::SeqCst)
self.measurement_id_seqno.fetch_add(1, Ordering::AcqRel)
);

MeasurementSeries::new(&series_id, name, self.step.state.clone())
Expand Down
28 changes: 14 additions & 14 deletions tests/output/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -127,7 +127,7 @@ async fn test_ocptv_error_macro_with_symptom() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -146,7 +146,7 @@ async fn test_ocptv_log_debug() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -166,7 +166,7 @@ async fn test_ocptv_log_info() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -185,7 +185,7 @@ async fn test_ocptv_log_warning() -> Result<()> {
"severity": "WARNING"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -204,7 +204,7 @@ async fn test_ocptv_log_error() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -223,7 +223,7 @@ async fn test_ocptv_log_fatal() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -242,7 +242,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message_in_step() -> Result<()>
"symptom":"symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -260,7 +260,7 @@ async fn test_ocptv_error_macro_with_symptom_in_step() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -279,7 +279,7 @@ async fn test_ocptv_log_debug_in_step() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -298,7 +298,7 @@ async fn test_ocptv_log_info_in_step() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -317,7 +317,7 @@ async fn test_ocptv_log_warning_in_step() -> Result<()> {
"severity":"WARNING"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -336,7 +336,7 @@ async fn test_ocptv_log_error_in_step() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -355,7 +355,7 @@ async fn test_ocptv_log_fatal_in_step() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand Down
Loading

0 comments on commit fcfceb7

Please sign in to comment.