From c39ca0728feb6bcc4a5bd3d4b71ce451ab31df4d Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 8 Jan 2025 12:30:07 +0000 Subject: [PATCH] [Mechanical] BifrostAdmin doesn't need the extra level of indirection --- crates/bifrost/src/bifrost.rs | 2 +- crates/bifrost/src/bifrost_admin.rs | 56 +++++++++++------------------ 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index b0da0b240..f144e0faa 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -121,7 +121,7 @@ impl Bifrost { /// Admin operations of bifrost pub fn admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new(self) + BifrostAdmin::new(&self.inner) } /// Appends a single record to a log. The log id must exist, otherwise the diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 9a197969b..3013e7a2d 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -20,14 +20,15 @@ use restate_types::logs::{LogId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Version; +use crate::bifrost::BifrostInner; use crate::error::AdminError; use crate::loglet_wrapper::LogletWrapper; -use crate::{Bifrost, Error, Result}; +use crate::{Error, Result}; /// Bifrost's Admin API #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { - bifrost: &'a Bifrost, + inner: &'a Arc, } #[derive(Debug)] @@ -39,16 +40,16 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new(bifrost: &'a Bifrost) -> Self { - Self { bifrost } + pub(crate) fn new(inner: &'a Arc) -> Self { + Self { inner } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to /// trim all records of the log. #[instrument(level = "debug", skip(self), err)] pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> { - self.bifrost.inner.fail_if_shutting_down()?; - self.bifrost.inner.trim(log_id, trim_point).await + self.inner.fail_if_shutting_down()?; + self.inner.trim(log_id, trim_point).await } /// Seals a loglet under a set of conditions. @@ -64,10 +65,10 @@ impl<'a> BifrostAdmin<'a> { log_id: LogId, segment_index: Option, ) -> Result<()> { - self.bifrost.inner.fail_if_shutting_down()?; + self.inner.fail_if_shutting_down()?; let logs = Metadata::with_current(|m| m.logs_snapshot()); let provider_config = &logs.configuration().default_provider; - let provider = self.bifrost.inner.provider_for(provider_config.kind())?; + let provider = self.inner.provider_for(provider_config.kind())?; // if this is a new log, we don't need to seal and we can immediately write to metadata // store, otherwise, we need to seal first. if logs.chain(&log_id).is_none() && segment_index.is_none() { @@ -123,7 +124,7 @@ impl<'a> BifrostAdmin<'a> { provider: ProviderKind, params: LogletParams, ) -> Result { - self.bifrost.inner.fail_if_shutting_down()?; + self.inner.fail_if_shutting_down()?; let metadata = Metadata::current(); let _ = metadata .wait_for_version(MetadataKind::Logs, min_version) @@ -154,14 +155,14 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.bifrost.inner.writeable_loglet(log_id).await + self.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] pub async fn seal(&self, log_id: LogId, segment_index: SegmentIndex) -> Result { - self.bifrost.inner.fail_if_shutting_down()?; + self.inner.fail_if_shutting_down()?; // first find the tail segment for this log. - let loglet = self.bifrost.inner.writeable_loglet(log_id).await?; + let loglet = self.inner.writeable_loglet(log_id).await?; if segment_index != loglet.segment_index() { // Not the same segment. Bail! @@ -216,14 +217,13 @@ impl<'a> BifrostAdmin<'a> { provider: ProviderKind, params: LogletParams, ) -> Result<()> { - self.bifrost.inner.fail_if_shutting_down()?; + self.inner.fail_if_shutting_down()?; let retry_policy = Configuration::pinned() .common .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.bifrost - .inner + self.inner .metadata_writer .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { @@ -250,11 +250,7 @@ impl<'a> BifrostAdmin<'a> { .await .map_err(|e| e.transpose())?; - self.bifrost - .inner - .metadata_writer - .update(Arc::new(logs)) - .await?; + self.inner.metadata_writer.update(Arc::new(logs)).await?; Ok(()) } @@ -266,14 +262,13 @@ impl<'a> BifrostAdmin<'a> { provider: ProviderKind, params: LogletParams, ) -> Result<()> { - self.bifrost.inner.fail_if_shutting_down()?; + self.inner.fail_if_shutting_down()?; let retry_policy = Configuration::pinned() .common .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.bifrost - .inner + self.inner .metadata_writer .metadata_store_client() .read_modify_write::<_, _, Error>( @@ -294,11 +289,7 @@ impl<'a> BifrostAdmin<'a> { .await .map_err(|e| e.transpose())?; - self.bifrost - .inner - .metadata_writer - .update(Arc::new(logs)) - .await?; + self.inner.metadata_writer.update(Arc::new(logs)).await?; Ok(()) } @@ -311,8 +302,7 @@ impl<'a> BifrostAdmin<'a> { .clone(); let logs = retry_on_network_error(retry_policy, || { - self.bifrost - .inner + self.inner .metadata_writer .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { @@ -322,11 +312,7 @@ impl<'a> BifrostAdmin<'a> { }) .await?; - self.bifrost - .inner - .metadata_writer - .update(Arc::new(logs)) - .await?; + self.inner.metadata_writer.update(Arc::new(logs)).await?; Ok(()) } }