Skip to content

Commit

Permalink
Fix race condition between provision step and BifrostService::start
Browse files Browse the repository at this point in the history
Summary:
Instead of racing, make sure that bifrost service waits until
metadata has been initialized.

To make this work, we make sure that on metadata initialization that
we also update the local metadata manager with new initialized
values.

Fixes #2486
  • Loading branch information
muhamadazmy committed Jan 22, 2025
1 parent 48394f8 commit 9fb2608
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 72 deletions.
24 changes: 5 additions & 19 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,26 +293,12 @@ impl<'a> BifrostAdmin<'a> {
Ok(())
}

/// Creates empty metadata if none exists for bifrost and publishes it to metadata
/// manager.
pub async fn init_metadata(&self) -> Result<(), Error> {
let retry_policy = Configuration::pinned()
.common
.network_error_retry_policy
.clone();

let logs = retry_on_network_error(retry_policy, || {
self.inner
.metadata_writer
.metadata_store_client()
.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
debug!("Attempting to initialize logs metadata in metadata store");
Logs::from_configuration(&Configuration::pinned())
})
})
.await?;
/// Wait for metadata initialization
pub async fn wait_metadata_init(&self) -> Result<(), Error> {
Metadata::current()
.wait_for_version(MetadataKind::Logs, Version::MIN)
.await?;

self.inner.metadata_writer.update(Arc::new(logs)).await?;
Ok(())
}
}
9 changes: 2 additions & 7 deletions crates/bifrost/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,8 @@ impl BifrostService {
///
/// This requires to run within a task_center context.
pub async fn start(self) -> anyhow::Result<()> {
// Make sure we have v1 metadata written to metadata store with the default
// configuration. If metadata is already initialized, this will make sure we have the
// latest version set in metadata manager.

// todo we seem to have a race condition between this call and the provision step which might
// write a different logs configuration
self.bifrost.admin().init_metadata().await?;
// Make sure we wait until metadata is initialized
self.bifrost.admin().wait_metadata_init().await?;

// initialize all enabled providers.
if self.factories.is_empty() {
Expand Down
89 changes: 53 additions & 36 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod roles;
use anyhow::Context;
use bytestring::ByteString;
use prost_dto::IntoProst;
use restate_types::net::metadata::MetadataContainer;
use restate_types::retries::RetryPolicy;
use std::num::NonZeroU16;
use tracing::{debug, error, info, trace, warn};

Expand All @@ -32,7 +34,7 @@ use restate_core::network::{
GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking,
};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::{cancellation_watcher, Metadata, TaskKind};
use restate_core::{cancellation_watcher, Metadata, MetadataWriter, TaskKind};
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter};
#[cfg(feature = "replicated-loglet")]
use restate_log_server::LogServerService;
Expand All @@ -54,7 +56,7 @@ use restate_types::partition_table::{PartitionReplication, PartitionTable, Parti
use restate_types::protobuf::common::{
AdminStatus, IngressStatus, LogServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus,
};
use restate_types::storage::StorageEncode;
use restate_types::storage::{StorageDecode, StorageEncode};
use restate_types::{GenerationalNodeId, Version, Versioned};

#[derive(Debug, thiserror::Error, CodedError)]
Expand Down Expand Up @@ -341,14 +343,14 @@ impl Node {
let health = self.health.clone();
let common_options = config.common.clone();
let connection_manager = self.networking.connection_manager().clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = metadata_writer.clone();
async move {
NetworkServer::run(
health,
connection_manager,
self.server_builder,
common_options,
metadata_store_client,
metadata_writer,
)
.await?;
Ok(())
Expand All @@ -370,13 +372,13 @@ impl Node {
}

if config.common.allow_bootstrap {
let metadata_writer = metadata_writer.clone();
TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", {
let cluster_configuration = ClusterConfiguration::from_configuration(&config);
let metadata_store_client = self.metadata_store_client.clone();
let common_opts = config.common.clone();
async move {
let response = provision_cluster_metadata(
&metadata_store_client,
&metadata_writer,
&common_opts,
&cluster_configuration,
)
Expand Down Expand Up @@ -577,39 +579,39 @@ impl ClusterConfiguration {
/// metadata store. In this case, the method does not try to clean the already written metadata
/// up. Instead, the caller can retry to complete the provisioning.
async fn provision_cluster_metadata(
metadata_store_client: &MetadataStoreClient,
metadata_writer: &MetadataWriter,
common_opts: &CommonOptions,
cluster_configuration: &ClusterConfiguration,
) -> anyhow::Result<bool> {
let (initial_nodes_configuration, initial_partition_table, initial_logs) =
generate_initial_metadata(common_opts, cluster_configuration);

let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
metadata_store_client.provision(&initial_nodes_configuration)
let success = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
metadata_writer
.metadata_store_client()
.provision(&initial_nodes_configuration)
})
.await?;

retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
write_initial_value_dont_fail_if_it_exists(
metadata_store_client,
PARTITION_TABLE_KEY.clone(),
&initial_partition_table,
)
})
write_initial_value(
metadata_writer,
PARTITION_TABLE_KEY.clone(),
initial_partition_table,
common_opts.network_error_retry_policy.clone(),
)
.await
.context("failed provisioning the initial partition table")?;

retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
write_initial_value_dont_fail_if_it_exists(
metadata_store_client,
BIFROST_CONFIG_KEY.clone(),
&initial_logs,
)
})
write_initial_value(
metadata_writer,
BIFROST_CONFIG_KEY.clone(),
initial_logs,
common_opts.network_error_retry_policy.clone(),
)
.await
.context("failed provisioning the initial logs")?;

Ok(result)
Ok(success)
}

fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration {
Expand Down Expand Up @@ -656,22 +658,37 @@ fn generate_initial_metadata(
)
}

async fn write_initial_value_dont_fail_if_it_exists<T: Versioned + StorageEncode>(
metadata_store_client: &MetadataStoreClient,
async fn write_initial_value<T>(
metadata_writer: &MetadataWriter,
key: ByteString,
initial_value: &T,
) -> Result<(), WriteError> {
match metadata_store_client
.put(key, initial_value, Precondition::DoesNotExist)
.await
{
Ok(_) => Ok(()),
mut value: T,
retry_policy: RetryPolicy,
) -> Result<(), anyhow::Error>
where
T: Versioned + StorageEncode + StorageDecode + Into<MetadataContainer>,
{
let result = retry_on_network_error(retry_policy.clone(), || {
metadata_writer
.metadata_store_client()
.put(key.clone(), &value, Precondition::DoesNotExist)
})
.await;

match result {
Ok(_) => {}
Err(WriteError::FailedPrecondition(_)) => {
// we might have failed on a previous attempt after writing this value; so let's continue
Ok(())
//update local copy
value = retry_on_network_error(retry_policy, || {
metadata_writer.metadata_store_client().get(key.clone())
})
.await?
.expect("must exist");
}
Err(err) => Err(err),
Err(err) => return Err(err.into()),
}

metadata_writer.update(value).await?;
Ok(())
}

#[cfg(not(feature = "replicated-loglet"))]
Expand Down
11 changes: 5 additions & 6 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use futures::stream::BoxStream;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc;
use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
Expand All @@ -26,7 +25,7 @@ use restate_core::protobuf::node_ctl_svc::{
ProvisionClusterResponse,
};
use restate_core::task_center::TaskCenterMonitoring;
use restate_core::{task_center, Metadata, MetadataKind, TargetVersion};
use restate_core::{task_center, Metadata, MetadataKind, MetadataWriter, TargetVersion};
use restate_types::config::Configuration;
use restate_types::health::Health;
use restate_types::logs::metadata::ProviderConfiguration;
Expand All @@ -42,7 +41,7 @@ pub struct NodeCtlSvcHandler {
cluster_name: String,
roles: EnumSet<Role>,
health: Health,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
}

impl NodeCtlSvcHandler {
Expand All @@ -51,14 +50,14 @@ impl NodeCtlSvcHandler {
cluster_name: String,
roles: EnumSet<Role>,
health: Health,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
) -> Self {
Self {
task_center,
cluster_name,
roles,
health,
metadata_store_client,
metadata_writer,
}
}

Expand Down Expand Up @@ -177,7 +176,7 @@ impl NodeCtlSvc for NodeCtlSvcHandler {
}

let newly_provisioned = provision_cluster_metadata(
&self.metadata_store_client,
&self.metadata_writer,
&config.common,
&cluster_configuration,
)
Expand Down
7 changes: 3 additions & 4 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ use tokio::time::MissedTickBehavior;
use tonic::codec::CompressionEncoding;
use tracing::{debug, trace};

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_core::{cancellation_watcher, MetadataWriter, TaskCenter, TaskKind};
use restate_types::config::CommonOptions;
use restate_types::health::Health;
use restate_types::protobuf::common::NodeStatus;
Expand All @@ -38,7 +37,7 @@ impl NetworkServer {
connection_manager: ConnectionManager<T>,
mut server_builder: NetworkServerBuilder,
options: CommonOptions,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
) -> Result<(), anyhow::Error> {
// Configure Metric Exporter
let mut state_builder = NodeCtrlHandlerStateBuilder::default();
Expand Down Expand Up @@ -105,7 +104,7 @@ impl NetworkServer {
options.cluster_name().to_owned(),
options.roles,
health,
metadata_store_client,
metadata_writer,
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
Expand Down
24 changes: 24 additions & 0 deletions crates/types/src/net/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,27 @@ impl MetadataContainer {
}
}
}

impl From<PartitionTable> for MetadataContainer {
fn from(value: PartitionTable) -> Self {
Self::PartitionTable(Arc::new(value))
}
}

impl From<Logs> for MetadataContainer {
fn from(value: Logs) -> Self {
Self::Logs(Arc::new(value))
}
}

impl From<NodesConfiguration> for MetadataContainer {
fn from(value: NodesConfiguration) -> Self {
Self::NodesConfiguration(Arc::new(value))
}
}

impl From<Schema> for MetadataContainer {
fn from(value: Schema) -> Self {
Self::Schema(Arc::new(value))
}
}

0 comments on commit 9fb2608

Please sign in to comment.