diff --git a/Cargo.lock b/Cargo.lock index 9f73279e855a5..274c95630d04a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5041,6 +5041,7 @@ dependencies = [ "mz-dyncfg", "mz-ore", "mz-persist", + "mz-persist-proc", "mz-persist-types", "mz-postgres-client", "mz-proto", @@ -5070,6 +5071,16 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "mz-persist-proc" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.107", + "workspace-hack", +] + [[package]] name = "mz-persist-txn" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 5fead62b11855..7a644afec16f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/persist", "src/persist-cli", "src/persist-client", + "src/persist-proc", "src/persist-txn", "src/persist-types", "src/pgcopy", diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index bc4c42a80c07e..f3b394f706650 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -42,6 +42,7 @@ mz-build-info = { path = "../build-info" } mz-dyncfg = { path = "../dyncfg" } mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] } mz-persist = { path = "../persist" } +mz-persist-proc = { path = "../persist-proc" } mz-persist-types = { path = "../persist-types" } mz-proto = { path = "../proto" } mz-timely-util = { path = "../timely-util" } diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 80157126e4ae2..3edcbe3d919c6 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -1126,6 +1126,8 @@ pub(crate) fn validate_truncate_batch( #[cfg(test)] mod tests { + use mz_dyncfg::ConfigUpdates; + use crate::cache::PersistClientCache; use crate::internal::paths::{BlobKey, PartialBlobKey}; use crate::tests::{all_ok, new_test_client, CodecProduct}; @@ -1339,10 +1341,10 @@ mod tests { } // NB: Most edge cases are exercised in datadriven tests. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow - async fn rewrite_ts_example() { - let client = new_test_client().await; + async fn rewrite_ts_example(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let (mut write, read) = client .expect_open::(ShardId::new()) .await; diff --git a/src/persist-client/src/critical.rs b/src/persist-client/src/critical.rs index 1f81373655402..a8d79cc07a26d 100644 --- a/src/persist-client/src/critical.rs +++ b/src/persist-client/src/critical.rs @@ -343,6 +343,7 @@ where mod tests { use std::str::FromStr; + use mz_dyncfg::ConfigUpdates; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -384,10 +385,10 @@ mod tests { assert_eq!(container.reader_id, id); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn rate_limit() { - let client = crate::tests::new_test_client().await; + async fn rate_limit(dyncfgs: ConfigUpdates) { + let client = crate::tests::new_test_client(&dyncfgs).await; let shard_id = crate::ShardId::new(); @@ -416,10 +417,10 @@ mod tests { } // Verifies that the handle updates its view of the opaque token correctly - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn handle_opaque_token() { - let client = new_test_client().await; + async fn handle_opaque_token(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); let mut since = client diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 23b8c6d8880bb..1815362c55db5 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -817,6 +817,7 @@ impl Timings { #[cfg(test)] mod tests { + use mz_dyncfg::ConfigUpdates; use mz_persist_types::codec_impls::{StringSchema, UnitSchema}; use timely::progress::Antichain; @@ -830,16 +831,16 @@ mod tests { // A regression test for a bug caught during development of #13160 (never // made it to main) where batches written by compaction would always have a // since of the minimum timestamp. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn regression_minimum_since() { + async fn regression_minimum_since(dyncfgs: ConfigUpdates) { let data = vec![ (("0".to_owned(), "zero".to_owned()), 0, 1), (("0".to_owned(), "zero".to_owned()), 1, -1), (("1".to_owned(), "one".to_owned()), 1, 1), ]; - let cache = new_test_client_cache(); + let cache = new_test_client_cache(&dyncfgs); cache.cfg.set_config(&BLOB_TARGET_SIZE, 100); let (mut write, _) = cache .open(PersistLocation::new_in_mem()) @@ -898,9 +899,9 @@ mod tests { assert_eq!(updates, all_ok(&data, 10)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn compaction_partial_order() { + async fn compaction_partial_order(dyncfgs: ConfigUpdates) { let data = vec![ ( ("0".to_owned(), "zero".to_owned()), @@ -914,7 +915,7 @@ mod tests { ), ]; - let cache = new_test_client_cache(); + let cache = new_test_client_cache(&dyncfgs); cache.cfg.set_config(&BLOB_TARGET_SIZE, 100); let (mut write, _) = cache .open(PersistLocation::new_in_mem()) diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs index 6e48643f02b71..6ab1b6df9b6c6 100644 --- a/src/persist-client/src/internal/datadriven.rs +++ b/src/persist-client/src/internal/datadriven.rs @@ -146,6 +146,8 @@ impl<'a> DirectiveArgs<'a> { } mod tests { + use mz_dyncfg::ConfigUpdates; + use super::*; #[mz_ore::test] @@ -178,13 +180,13 @@ mod tests { }); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow - async fn machine() { + async fn machine(dyncfgs: ConfigUpdates) { use crate::internal::machine::datadriven as machine_dd; ::datadriven::walk_async("tests/machine", |mut f| { - let initial_state_fut = machine_dd::MachineState::new(); + let initial_state_fut = machine_dd::MachineState::new(&dyncfgs); async move { println!("running datadriven file: {}", f.filename); let state = Arc::new(Mutex::new(initial_state_fut.await)); diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index f722cba44f84a..1e13a57fd8a2b 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1366,6 +1366,7 @@ impl RustType for Antichain { mod tests { use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; + use mz_dyncfg::ConfigUpdates; use mz_persist::location::SeqNo; use proptest::prelude::*; @@ -1570,9 +1571,9 @@ mod tests { ); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn state_diff_migration_rollups() { + async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) { let r1_rollup = HollowRollup { key: PartialRollupKey("foo".to_owned()), encoded_size_bytes: None, @@ -1647,7 +1648,7 @@ mod tests { let state: Rollup = rollup.into_rust().unwrap(); let state = state.state; let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap(); - let cache = new_test_client_cache(); + let cache = new_test_client_cache(&dyncfgs); let encoded_diff = VersionedData { seqno: SeqNo(5), data: diff_proto.encode_to_vec().into(), diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 30eee981bfc8f..16e356022a74e 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1303,9 +1303,9 @@ pub mod datadriven { } impl MachineState { - pub async fn new() -> Self { + pub async fn new(dyncfgs: &ConfigUpdates) -> Self { let shard_id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(dyncfgs).await; // Reset blob_target_size. Individual batch writes and compactions // can override it with an arg. client @@ -2246,24 +2246,25 @@ pub mod datadriven { pub mod tests { use std::sync::Arc; - use crate::cache::StateCache; + use mz_dyncfg::ConfigUpdates; use mz_ore::cast::CastFrom; use mz_ore::task::spawn; use mz_persist::intercept::{InterceptBlob, InterceptHandle}; use mz_persist::location::SeqNo; use timely::progress::Antichain; + use crate::cache::StateCache; use crate::internal::gc::{GarbageCollector, GcReq}; use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD}; use crate::tests::new_test_client; use crate::ShardId; - #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance` - async fn apply_unbatched_cmd_truncate() { + async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) { mz_ore::test::init_logging(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; // set a low rollup threshold so GC/truncation is more aggressive client.cfg.set_config(&ROLLUP_THRESHOLD, 5); let (mut write, _) = client @@ -2314,10 +2315,10 @@ pub mod tests { // A regression test for #14719, where a bug in gc led to an incremental // state invariant being violated which resulted in gc being permanently // wedged for the shard. - #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance` - async fn regression_gc_skipped_req_and_interrupted() { - let mut client = new_test_client().await; + async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) { + let mut client = new_test_client(&dyncfgs).await; let intercept = InterceptHandle::default(); client.blob = Arc::new(InterceptBlob::new( Arc::clone(&client.blob), @@ -2364,10 +2365,10 @@ pub mod tests { // would not fetch the latest state after an upper mismatch. This meant that // a write that could succeed if retried on the latest state would instead // return an UpperMismatch. - #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance` - async fn regression_update_state_after_upper_mismatch() { - let client = new_test_client().await; + async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let mut client2 = client.clone(); // The bug can only happen if the two WriteHandles have separate copies diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 098d78a47bf30..7268906ef648c 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1737,6 +1737,7 @@ pub(crate) mod tests { use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; + use mz_dyncfg::ConfigUpdates; use mz_ore::now::SYSTEM_TIME; use mz_proto::RustType; use proptest::prelude::*; @@ -2694,10 +2695,10 @@ pub(crate) mod tests { ); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow - async fn sneaky_downgrades() { - let mut clients = new_test_client_cache(); + async fn sneaky_downgrades(dyncfgs: ConfigUpdates) { + let mut clients = new_test_client_cache(&dyncfgs); let shard_id = ShardId::new(); async fn open_and_write( diff --git a/src/persist-client/src/internal/state_versions.rs b/src/persist-client/src/internal/state_versions.rs index 8de36874fcaa6..65a69d18b236c 100644 --- a/src/persist-client/src/internal/state_versions.rs +++ b/src/persist-client/src/internal/state_versions.rs @@ -1167,16 +1167,18 @@ impl ReferencedBlobValidator { #[cfg(test)] mod tests { + use mz_dyncfg::ConfigUpdates; + use crate::tests::new_test_client; use super::*; /// Regression test for (part of) #17752, where an interrupted /// `bin/environmentd --reset` resulted in panic in persist usage code. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn fetch_all_live_states_regression_uninitialized() { - let client = new_test_client().await; + async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let state_versions = StateVersions::new( client.cfg.clone(), Arc::clone(&client.consensus), diff --git a/src/persist-client/src/internal/watch.rs b/src/persist-client/src/internal/watch.rs index 7c2d5c7957089..eaa3696bdd4b9 100644 --- a/src/persist-client/src/internal/watch.rs +++ b/src/persist-client/src/internal/watch.rs @@ -142,6 +142,7 @@ mod tests { use futures::FutureExt; use futures_task::noop_waker; use mz_build_info::DUMMY_BUILD_INFO; + use mz_dyncfg::ConfigUpdates; use mz_ore::cast::CastFrom; use mz_ore::metrics::MetricsRegistry; use timely::progress::Antichain; @@ -277,14 +278,14 @@ mod tests { } } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn state_watch_listen_snapshot() { + async fn state_watch_listen_snapshot(dyncfgs: ConfigUpdates) { mz_ore::test::init_logging(); let waker = noop_waker(); let mut cx = Context::from_waker(&waker); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; // Override the listen poll so that it's useless. client.cfg.set_config( &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF, diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 420743dbcc45b..229d75b422fb5 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -666,7 +666,7 @@ mod tests { use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::lattice::Lattice; use futures_task::noop_waker; - + use mz_dyncfg::ConfigUpdates; use mz_persist::indexed::encoding::BlobTraceBatchPart; use mz_persist::workload::DataGenerator; use mz_persist_types::codec_impls::{StringSchema, VecU8Schema}; @@ -747,20 +747,21 @@ mod tests { } } - pub fn new_test_client_cache() -> PersistClientCache { + pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache { // Configure an aggressively small blob_target_size so we get some // amount of coverage of that in tests. Similarly, for max_outstanding. let mut cache = PersistClientCache::new_no_metrics(); cache.cfg.set_config(&BLOB_TARGET_SIZE, 10); cache.cfg.dynamic.set_batch_builder_max_outstanding_parts(1); + dyncfgs.apply(cache.cfg()); // Enable compaction in tests to ensure we get coverage. cache.cfg.compaction_enabled = true; cache } - pub async fn new_test_client() -> PersistClient { - let cache = new_test_client_cache(); + pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient { + let cache = new_test_client_cache(dyncfgs); cache .open(PersistLocation::new_in_mem()) .await @@ -821,16 +822,16 @@ mod tests { (part, updates) } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn sanity_check() { + async fn sanity_check(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), (("3".to_owned(), "three".to_owned()), 3, 1), ]; - let (mut write, mut read) = new_test_client() + let (mut write, mut read) = new_test_client(&dyncfgs) .await .expect_open::(ShardId::new()) .await; @@ -869,9 +870,9 @@ mod tests { } // Sanity check that the open_reader and open_writer calls work. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn open_reader_writer() { + async fn open_reader_writer(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -879,7 +880,7 @@ mod tests { ]; let shard_id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let mut write1 = client .open_writer::( shard_id, @@ -926,9 +927,9 @@ mod tests { assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow - async fn invalid_usage() { + async fn invalid_usage(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -938,7 +939,7 @@ mod tests { let shard_id0 = "s00000000-0000-0000-0000-000000000000" .parse::() .expect("invalid shard id"); - let mut client = new_test_client().await; + let mut client = new_test_client(&dyncfgs).await; let (mut write0, mut read0) = client .expect_open::(shard_id0) @@ -1185,9 +1186,9 @@ mod tests { } } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn multiple_shards() { + async fn multiple_shards(dyncfgs: ConfigUpdates) { let data1 = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1195,7 +1196,7 @@ mod tests { let data2 = vec![(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)]; - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write1, mut read1) = client .expect_open::(ShardId::new()) @@ -1226,15 +1227,15 @@ mod tests { ); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn fetch_upper() { + async fn fetch_upper(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), ]; - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); @@ -1258,15 +1259,15 @@ mod tests { assert_eq!(write2.upper(), &Antichain::from_elem(3)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn append_with_invalid_upper() { + async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), ]; - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); @@ -1304,14 +1305,14 @@ mod tests { // Make sure that the API structs are Sync + Send, so that they can be used in async tasks. // NOTE: This is a compile-time only test. If it compiles, we're good. #[allow(unused)] - async fn sync_send() { + async fn sync_send(dyncfgs: ConfigUpdates) { mz_ore::test::init_logging(); fn is_send_sync(_x: T) -> bool { true } - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (write, read) = client .expect_open::(ShardId::new()) @@ -1322,9 +1323,9 @@ mod tests { assert!(is_send_sync(read)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn compare_and_append() { + async fn compare_and_append(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1332,7 +1333,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write1, mut read) = client.expect_open::(id).await; let (mut write2, _read) = client.expect_open::(id).await; @@ -1379,9 +1380,9 @@ mod tests { assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn overlapping_append() { + async fn overlapping_append(dyncfgs: ConfigUpdates) { mz_ore::test::init_logging_default("info"); let data = vec![ @@ -1393,7 +1394,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write1, mut read) = client.expect_open::(id).await; @@ -1430,9 +1431,9 @@ mod tests { // Appends need to be contiguous for a shard, meaning the lower of an appended batch must not // be in advance of the current shard upper. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn contiguous_append() { + async fn contiguous_append(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1442,7 +1443,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write, mut read) = client.expect_open::(id).await; @@ -1478,9 +1479,9 @@ mod tests { // Per-writer appends can be non-contiguous, as long as appends to the shard from all writers // combined are contiguous. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn noncontiguous_append_per_writer() { + async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1490,7 +1491,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write1, mut read) = client.expect_open::(id).await; @@ -1521,9 +1522,9 @@ mod tests { // Compare_and_appends need to be contiguous for a shard, meaning the lower of an appended // batch needs to match the current shard upper. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn contiguous_compare_and_append() { + async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1533,7 +1534,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write, mut read) = client.expect_open::(id).await; @@ -1568,9 +1569,9 @@ mod tests { // Per-writer compare_and_appends can be non-contiguous, as long as appends to the shard from // all writers combined are contiguous. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn noncontiguous_compare_and_append_per_writer() { + async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -1580,7 +1581,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write1, mut read) = client.expect_open::(id).await; @@ -1613,14 +1614,14 @@ mod tests { ); } - #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance` - async fn concurrency() { + async fn concurrency(dyncfgs: ConfigUpdates) { let data = DataGenerator::small(); const NUM_WRITERS: usize = 2; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let mut handles = Vec::>::new(); for idx in 0..NUM_WRITERS { let (data, client) = (data.clone(), client.clone()); @@ -1711,9 +1712,9 @@ mod tests { // Regression test for #12131. Snapshot with as_of >= upper would // immediately return the data currently available instead of waiting for // upper to advance past as_of. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn regression_blocking_reads() { + async fn regression_blocking_reads(dyncfgs: ConfigUpdates) { let waker = noop_waker(); let mut cx = Context::from_waker(&waker); @@ -1724,7 +1725,7 @@ mod tests { ]; let id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write, mut read) = client.expect_open::(id).await; // Grab a listener as_of (aka gt) 1, which is not yet closed out. @@ -1783,12 +1784,12 @@ mod tests { assert_eq!(snap.await, all_ok(&data[..], 3)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn heartbeat_task_shutdown() { + async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) { // Verify that the ReadHandle and WriteHandle background heartbeat tasks // shut down cleanly after the handle is expired. - let mut cache = new_test_client_cache(); + let mut cache = new_test_client_cache(&dyncfgs); cache .cfg .set_config(&READER_LEASE_DURATION, Duration::from_millis(1)); @@ -1814,11 +1815,11 @@ mod tests { /// Regression test for 16743, where the nightly tests found that calling /// maybe_heartbeat_writer or maybe_heartbeat_reader on a "tombstone" shard /// would panic. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn regression_16743_heartbeat_tombstone() { + async fn regression_16743_heartbeat_tombstone(dyncfgs: ConfigUpdates) { const EMPTY: &[(((), ()), u64, i64)] = &[]; - let (mut write, mut read) = new_test_client() + let (mut write, mut read) = new_test_client(&dyncfgs) .await .expect_open::<(), (), u64, i64>(ShardId::new()) .await; diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 54eee86572294..88e188312eaef 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -1218,6 +1218,7 @@ mod tests { use std::pin; use std::str::FromStr; + use mz_dyncfg::ConfigUpdates; use mz_ore::cast::CastFrom; use mz_ore::metrics::MetricsRegistry; use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus}; @@ -1237,16 +1238,16 @@ mod tests { use super::*; // Verifies `Subscribe` can be dropped while holding snapshot batches. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn drop_unused_subscribe() { + async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) { let data = vec![ (("0".to_owned(), "zero".to_owned()), 0, 1), (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), ]; - let (mut write, read) = new_test_client() + let (mut write, read) = new_test_client(&dyncfgs) .await .expect_open::(crate::ShardId::new()) .await; @@ -1267,9 +1268,9 @@ mod tests { } // Verifies that we streaming-consolidate away identical key-values in the same batch. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn streaming_consolidate() { + async fn streaming_consolidate(dyncfgs: ConfigUpdates) { let data = &[ // Identical records should sum together... (("k".to_owned(), "v".to_owned()), 0, 1), @@ -1281,7 +1282,7 @@ mod tests { ]; let (mut write, read) = { - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together! client .expect_open::(crate::ShardId::new()) @@ -1316,9 +1317,9 @@ mod tests { ) } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn snapshot_and_stream() { + async fn snapshot_and_stream(dyncfgs: ConfigUpdates) { let data = &mut [ (("k1".to_owned(), "v1".to_owned()), 0, 1), (("k2".to_owned(), "v2".to_owned()), 1, 1), @@ -1328,7 +1329,7 @@ mod tests { ]; let (mut write, mut read) = { - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts client .expect_open::(crate::ShardId::new()) @@ -1355,9 +1356,9 @@ mod tests { } // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/materialize/issues/19983 - async fn seqno_leases() { + async fn seqno_leases(dyncfgs: ConfigUpdates) { let mut data = vec![]; for i in 0..20 { data.push(((i.to_string(), i.to_string()), i, 1)) @@ -1365,7 +1366,7 @@ mod tests { let shard_id = ShardId::new(); - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let (mut write, read) = client .expect_open::(shard_id) .await; diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 6ab53c92f0b2c..e21d3826232de 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -736,21 +736,22 @@ impl std::fmt::Display for HumanBytes { #[cfg(test)] mod tests { - use crate::batch::{BLOB_TARGET_SIZE, INLINE_UPDATE_THRESHOLD_BYTES}; use bytes::Bytes; + use mz_dyncfg::ConfigUpdates; use mz_persist::location::SeqNo; use semver::Version; use timely::progress::Antichain; + use crate::batch::{BLOB_TARGET_SIZE, INLINE_UPDATE_THRESHOLD_BYTES}; use crate::internal::paths::{PartialRollupKey, RollupId}; use crate::tests::new_test_client; use crate::ShardId; use super::*; - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn size() { + async fn size(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -758,7 +759,7 @@ mod tests { (("4".to_owned(), "four".to_owned()), 4, 1), ]; - let client = new_test_client().await; + let client = new_test_client(&dyncfgs).await; let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let build_version = client.cfg.build_version.clone(); let shard_id_one = ShardId::new(); @@ -861,9 +862,9 @@ mod tests { /// This is just a sanity check for the overall flow of computing ShardUsage. /// The edge cases are exercised in separate tests. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn usage_sanity() { + async fn usage_sanity(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -872,7 +873,7 @@ mod tests { ]; let shard_id = ShardId::new(); - let mut client = new_test_client().await; + let mut client = new_test_client(&dyncfgs).await; let inline_writes_enabled = INLINE_UPDATE_THRESHOLD_BYTES.get(&client.cfg) > 0; let (mut write0, _) = client @@ -930,9 +931,9 @@ mod tests { } } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn usage_referenced() { + async fn usage_referenced(dyncfgs: ConfigUpdates) { mz_ore::test::init_logging(); let data = vec![ @@ -943,7 +944,7 @@ mod tests { ]; let shard_id = ShardId::new(); - let mut client = new_test_client().await; + let mut client = new_test_client(&dyncfgs).await; // make our bookkeeping simple by skipping compaction blobs writes client.cfg.compaction_enabled = false; // make things interesting and create multiple parts per batch @@ -1246,10 +1247,10 @@ mod tests { /// This also tests a (hypothesized) race that's possible in prod where an /// initial rollup is written for a shard, but the initial CaS hasn't yet /// succeeded. - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn usage_regression_shard_in_blob_not_consensus() { - let client = new_test_client().await; + async fn usage_regression_shard_in_blob_not_consensus(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); // Somewhat unsatisfying, we manually construct a rollup blob key. diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 7c86a75840853..d4167a85918e8 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -801,28 +801,29 @@ mod tests { use std::str::FromStr; use std::sync::mpsc; - use crate::cache::PersistClientCache; use differential_dataflow::consolidation::consolidate_updates; use futures_util::FutureExt; + use mz_dyncfg::ConfigUpdates; use mz_ore::collections::CollectionExt; use mz_ore::task; use serde_json::json; + use crate::cache::PersistClientCache; use crate::tests::{all_ok, new_test_client}; use crate::{PersistLocation, ShardId}; use super::*; - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn empty_batches() { + async fn empty_batches(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), (("3".to_owned(), "three".to_owned()), 3, 1), ]; - let (mut write, _) = new_test_client() + let (mut write, _) = new_test_client(&dyncfgs) .await .expect_open::(ShardId::new()) .await; @@ -853,9 +854,9 @@ mod tests { assert_eq!(count_after, count_before); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn compare_and_append_batch_multi() { + async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) { let data0 = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), @@ -867,7 +868,7 @@ mod tests { (("3".to_owned(), "three".to_owned()), 3, 1), ]; - let (mut write, mut read) = new_test_client() + let (mut write, mut read) = new_test_client(&dyncfgs) .await .expect_open::(ShardId::new()) .await; @@ -930,16 +931,16 @@ mod tests { assert_eq!(container.writer_id, id); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn hollow_batch_roundtrip() { + async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) { let data = vec![ (("1".to_owned(), "one".to_owned()), 1, 1), (("2".to_owned(), "two".to_owned()), 2, 1), (("3".to_owned(), "three".to_owned()), 3, 1), ]; - let (mut write, mut read) = new_test_client() + let (mut write, mut read) = new_test_client(&dyncfgs) .await .expect_open::(ShardId::new()) .await; @@ -966,10 +967,10 @@ mod tests { assert_eq!(actual, all_ok(&expected, 3)); } - #[mz_ore::test(tokio::test)] + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented - async fn wait_for_upper_past() { - let client = new_test_client().await; + async fn wait_for_upper_past(dyncfgs: ConfigUpdates) { + let client = new_test_client(&dyncfgs).await; let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await; let five = Antichain::from_elem(5); diff --git a/src/persist-proc/Cargo.toml b/src/persist-proc/Cargo.toml new file mode 100644 index 0000000000000..a8424147d5a81 --- /dev/null +++ b/src/persist-proc/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "mz-persist-proc" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +license = "Apache-2.0" + +[lints] +workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1.0" +quote = { version = "1.0" } +syn = { version = "1.0", features = ["extra-traits", "full"] } +workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } + +[features] +default = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] diff --git a/src/persist-proc/src/lib.rs b/src/persist-proc/src/lib.rs new file mode 100644 index 0000000000000..9af9f2d5cbd56 --- /dev/null +++ b/src/persist-proc/src/lib.rs @@ -0,0 +1,96 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Internal utility proc-macros for persist. +//! +//! Note: This is separate from the `mz_persist_client` crate because +//! `proc-macro` crates are only allowed to export procedural macros and nothing +//! else. + +use proc_macro::TokenStream; +use proc_macro2::TokenStream as TokenStream2; +use quote::quote; +use syn::{parse_macro_input, ItemFn, ReturnType}; + +/// Persist wrapper around the `test` macro. +/// +/// The wrapper automatically runs the test with various interesting +/// configurations. +#[proc_macro_attribute] +pub fn test(attr: TokenStream, item: TokenStream) -> TokenStream { + test_impl(attr, item) +} + +/// Implementation for the `#[test]` macro. +fn test_impl(attr: TokenStream, item: TokenStream) -> TokenStream { + let args = TokenStream2::from(attr); + let item = parse_macro_input!(item as ItemFn); + + let attrs = &item.attrs; + let async_ = &item.sig.asyncness; + let await_ = if async_.is_some() { + quote! {.await} + } else { + quote! {} + }; + let inputs = &item.sig.inputs; + let body = &item.block; + let test_name = &item.sig.ident; + + // Note that Rust does not allow us to have a test function with + // #[should_panic] that has a non-unit return value. + let ret = match &item.sig.output { + ReturnType::Default => quote! {}, + ReturnType::Type(_, type_) => quote! {-> #type_}, + }; + + quote! { + #[::mz_ore::test( + #args + )] + #(#attrs)* + #async_ fn #test_name() #ret { + #async_ fn test_impl(#inputs) #ret { + #body + } + + let dyncfgs = [ + { + // Inline writes disabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x + }, + { + // Inline writes enabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_update_threshold_bytes", ::mz_dyncfg::ConfigVal::Usize(4 * 1024)); + x.add_dynamic("persist_inline_update_max_bytes", ::mz_dyncfg::ConfigVal::Usize(1024 * 1024)); + x + }, + ]; + + for (idx, dyncfgs) in dyncfgs.into_iter().enumerate() { + let debug = dyncfgs.updates.iter().map(|(name, val)| { + format!(" {}={:?}", name, val.val.clone().unwrap()) + }).collect::(); + eprintln!("mz_persist_proc::test {}{}", idx, debug); + test_impl(dyncfgs)#await_ + } + } + } + .into() +}