diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 20d5992304..112e5d71da 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -37,7 +37,7 @@ use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, Tas #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::{ - BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, + BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, ReadModifyWriteError, }; use restate_types::config::{CommonOptions, Configuration}; use restate_types::errors::GenericError; @@ -590,24 +590,28 @@ async fn provision_cluster_metadata( .await?; retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - write_initial_value_dont_fail_if_it_exists( + write_initial_logs_dont_fail_if_it_exists( metadata_store_client, - PARTITION_TABLE_KEY.clone(), - &initial_partition_table, + BIFROST_CONFIG_KEY.clone(), + initial_logs.clone(), ) }) .await - .context("failed provisioning the initial partition table")?; + .context("failed provisioning the initial logs")?; + // NOTE: + // The partition table metadata must be initialized only after the bifrost (logs) metadata. + // Otherwise, the logs controller may begin creating logs with incorrect configurations + // initialized by bifrost. retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { write_initial_value_dont_fail_if_it_exists( metadata_store_client, - BIFROST_CONFIG_KEY.clone(), - &initial_logs, + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, ) }) .await - .context("failed provisioning the initial logs")?; + .context("failed provisioning the initial partition table")?; Ok(result) } @@ -674,6 +678,37 @@ async fn write_initial_value_dont_fail_if_it_exists Result<(), ReadWriteError> { + let value = metadata_store_client + .read_modify_write(key, |current| match current { + None => Ok(initial_value.clone()), + Some(current_value) => { + if current_value.configuration() == initial_value.configuration() { + Err(ReadModifyWriteError::FailedOperation(AlreadyInitialized)) + } else if current_value.version() == Version::MIN && current_value.num_logs() == 0 { + let builder = initial_value.clone().into_builder(); + // make sure version is incremented to MIN + 1 + Ok(builder.build()) + } else { + Err(ReadModifyWriteError::FailedOperation(AlreadyInitialized)) + } + } + }) + .await; + + match value { + Ok(_) | Err(ReadModifyWriteError::FailedOperation(_)) => Ok(()), + Err(ReadModifyWriteError::ReadWrite(err)) => Err(err), + } +} + +#[derive(Debug)] +struct AlreadyInitialized; + #[cfg(not(feature = "replicated-loglet"))] fn warn_if_log_store_left_artifacts(config: &Configuration) { if config.log_server.data_dir().exists() {