Skip to content

Commit

Permalink
persist: introduce proc macro to run unit tests with various configs
Browse files Browse the repository at this point in the history
We've gotten somewhat lucky so far to be able to more-or-less pick a
default set of configuration values for persist unit tests, perhaps
adding a couple new tests for the non-default cases. But inline writes
probably wants to keep testing coverage of both the enabled and disabled
path in perpetuity.

So, introduce a new `mz_persist_proc::test` prod macro that runs an
entire cargo test under an arbitrary set of configurations and hook it
up to persist's tests.

Intentionally don't add a lint requiring its usage because many tests
don't use configuration and so running them multiple times would be pure
waste. Particularly egregious would be the various randomized prop
tests (e.g. for encoding).
  • Loading branch information
danhhz committed Apr 3, 2024
1 parent 9c5ee3b commit e22c40a
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 134 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"src/persist",
"src/persist-cli",
"src/persist-client",
"src/persist-proc",
"src/persist-txn",
"src/persist-types",
"src/pgcopy",
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mz-build-info = { path = "../build-info" }
mz-dyncfg = { path = "../dyncfg" }
mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] }
mz-persist = { path = "../persist" }
mz-persist-proc = { path = "../persist-proc" }
mz-persist-types = { path = "../persist-types" }
mz-proto = { path = "../proto" }
mz-timely-util = { path = "../timely-util" }
Expand Down
8 changes: 5 additions & 3 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,8 @@ pub(crate) fn validate_truncate_batch<T: Timestamp>(

#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;

use crate::cache::PersistClientCache;
use crate::internal::paths::{BlobKey, PartialBlobKey};
use crate::tests::{all_ok, new_test_client, CodecProduct};
Expand Down Expand Up @@ -1339,10 +1341,10 @@ mod tests {
}

// NB: Most edge cases are exercised in datadriven tests.
#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // too slow
async fn rewrite_ts_example() {
let client = new_test_client().await;
async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let (mut write, read) = client
.expect_open::<String, (), u64, i64>(ShardId::new())
.await;
Expand Down
13 changes: 7 additions & 6 deletions src/persist-client/src/critical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ where
mod tests {
use std::str::FromStr;

use mz_dyncfg::ConfigUpdates;
use serde::{Deserialize, Serialize};
use serde_json::json;

Expand Down Expand Up @@ -384,10 +385,10 @@ mod tests {
assert_eq!(container.reader_id, id);
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn rate_limit() {
let client = crate::tests::new_test_client().await;
async fn rate_limit(dyncfgs: ConfigUpdates) {
let client = crate::tests::new_test_client(&dyncfgs).await;

let shard_id = crate::ShardId::new();

Expand Down Expand Up @@ -416,10 +417,10 @@ mod tests {
}

// Verifies that the handle updates its view of the opaque token correctly
#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn handle_opaque_token() {
let client = new_test_client().await;
async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let shard_id = ShardId::new();

let mut since = client
Expand Down
13 changes: 7 additions & 6 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ impl Timings {

#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;
use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
use timely::progress::Antichain;

Expand All @@ -830,16 +831,16 @@ mod tests {
// A regression test for a bug caught during development of #13160 (never
// made it to main) where batches written by compaction would always have a
// since of the minimum timestamp.
#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn regression_minimum_since() {
async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
let data = vec![
(("0".to_owned(), "zero".to_owned()), 0, 1),
(("0".to_owned(), "zero".to_owned()), 1, -1),
(("1".to_owned(), "one".to_owned()), 1, 1),
];

let cache = new_test_client_cache();
let cache = new_test_client_cache(&dyncfgs);
cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
let (mut write, _) = cache
.open(PersistLocation::new_in_mem())
Expand Down Expand Up @@ -898,9 +899,9 @@ mod tests {
assert_eq!(updates, all_ok(&data, 10));
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn compaction_partial_order() {
async fn compaction_partial_order(dyncfgs: ConfigUpdates) {
let data = vec![
(
("0".to_owned(), "zero".to_owned()),
Expand All @@ -914,7 +915,7 @@ mod tests {
),
];

let cache = new_test_client_cache();
let cache = new_test_client_cache(&dyncfgs);
cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
let (mut write, _) = cache
.open(PersistLocation::new_in_mem())
Expand Down
8 changes: 5 additions & 3 deletions src/persist-client/src/internal/datadriven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ impl<'a> DirectiveArgs<'a> {
}

mod tests {
use mz_dyncfg::ConfigUpdates;

use super::*;

#[mz_ore::test]
Expand Down Expand Up @@ -178,13 +180,13 @@ mod tests {
});
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // too slow
async fn machine() {
async fn machine(dyncfgs: ConfigUpdates) {
use crate::internal::machine::datadriven as machine_dd;

::datadriven::walk_async("tests/machine", |mut f| {
let initial_state_fut = machine_dd::MachineState::new();
let initial_state_fut = machine_dd::MachineState::new(&dyncfgs);
async move {
println!("running datadriven file: {}", f.filename);
let state = Arc::new(Mutex::new(initial_state_fut.await));
Expand Down
7 changes: 4 additions & 3 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
mod tests {
use bytes::Bytes;
use mz_build_info::DUMMY_BUILD_INFO;
use mz_dyncfg::ConfigUpdates;
use mz_persist::location::SeqNo;
use proptest::prelude::*;

Expand Down Expand Up @@ -1570,9 +1571,9 @@ mod tests {
);
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn state_diff_migration_rollups() {
async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
let r1_rollup = HollowRollup {
key: PartialRollupKey("foo".to_owned()),
encoded_size_bytes: None,
Expand Down Expand Up @@ -1647,7 +1648,7 @@ mod tests {
let state: Rollup<u64> = rollup.into_rust().unwrap();
let state = state.state;
let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
let cache = new_test_client_cache();
let cache = new_test_client_cache(&dyncfgs);
let encoded_diff = VersionedData {
seqno: SeqNo(5),
data: diff_proto.encode_to_vec().into(),
Expand Down
25 changes: 13 additions & 12 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,9 +1303,9 @@ pub mod datadriven {
}

impl MachineState {
pub async fn new() -> Self {
pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
let shard_id = ShardId::new();
let client = new_test_client().await;
let client = new_test_client(dyncfgs).await;
// Reset blob_target_size. Individual batch writes and compactions
// can override it with an arg.
client
Expand Down Expand Up @@ -2246,24 +2246,25 @@ pub mod datadriven {
pub mod tests {
use std::sync::Arc;

use crate::cache::StateCache;
use mz_dyncfg::ConfigUpdates;
use mz_ore::cast::CastFrom;
use mz_ore::task::spawn;
use mz_persist::intercept::{InterceptBlob, InterceptHandle};
use mz_persist::location::SeqNo;
use timely::progress::Antichain;

use crate::cache::StateCache;
use crate::internal::gc::{GarbageCollector, GcReq};
use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
use crate::tests::new_test_client;
use crate::ShardId;

#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
async fn apply_unbatched_cmd_truncate() {
async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
mz_ore::test::init_logging();

let client = new_test_client().await;
let client = new_test_client(&dyncfgs).await;
// set a low rollup threshold so GC/truncation is more aggressive
client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
let (mut write, _) = client
Expand Down Expand Up @@ -2314,10 +2315,10 @@ pub mod tests {
// A regression test for #14719, where a bug in gc led to an incremental
// state invariant being violated which resulted in gc being permanently
// wedged for the shard.
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
async fn regression_gc_skipped_req_and_interrupted() {
let mut client = new_test_client().await;
async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
let mut client = new_test_client(&dyncfgs).await;
let intercept = InterceptHandle::default();
client.blob = Arc::new(InterceptBlob::new(
Arc::clone(&client.blob),
Expand Down Expand Up @@ -2364,10 +2365,10 @@ pub mod tests {
// would not fetch the latest state after an upper mismatch. This meant that
// a write that could succeed if retried on the latest state would instead
// return an UpperMismatch.
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
async fn regression_update_state_after_upper_mismatch() {
let client = new_test_client().await;
async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let mut client2 = client.clone();

// The bug can only happen if the two WriteHandles have separate copies
Expand Down
7 changes: 4 additions & 3 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,7 @@ pub(crate) mod tests {

use bytes::Bytes;
use mz_build_info::DUMMY_BUILD_INFO;
use mz_dyncfg::ConfigUpdates;
use mz_ore::now::SYSTEM_TIME;
use mz_proto::RustType;
use proptest::prelude::*;
Expand Down Expand Up @@ -2694,10 +2695,10 @@ pub(crate) mod tests {
);
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // too slow
async fn sneaky_downgrades() {
let mut clients = new_test_client_cache();
async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
let mut clients = new_test_client_cache(&dyncfgs);
let shard_id = ShardId::new();

async fn open_and_write(
Expand Down
8 changes: 5 additions & 3 deletions src/persist-client/src/internal/state_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,16 +1167,18 @@ impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {

#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;

use crate::tests::new_test_client;

use super::*;

/// Regression test for (part of) #17752, where an interrupted
/// `bin/environmentd --reset` resulted in panic in persist usage code.
#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn fetch_all_live_states_regression_uninitialized() {
let client = new_test_client().await;
async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let state_versions = StateVersions::new(
client.cfg.clone(),
Arc::clone(&client.consensus),
Expand Down
7 changes: 4 additions & 3 deletions src/persist-client/src/internal/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ mod tests {
use futures::FutureExt;
use futures_task::noop_waker;
use mz_build_info::DUMMY_BUILD_INFO;
use mz_dyncfg::ConfigUpdates;
use mz_ore::cast::CastFrom;
use mz_ore::metrics::MetricsRegistry;
use timely::progress::Antichain;
Expand Down Expand Up @@ -277,14 +278,14 @@ mod tests {
}
}

#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn state_watch_listen_snapshot() {
async fn state_watch_listen_snapshot(dyncfgs: ConfigUpdates) {
mz_ore::test::init_logging();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let client = new_test_client().await;
let client = new_test_client(&dyncfgs).await;
// Override the listen poll so that it's useless.
client.cfg.set_config(
&NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
Expand Down
Loading

0 comments on commit e22c40a

Please sign in to comment.