Skip to content

Commit

Permalink
Feature: Add Raft::external_state_machine_request() to run a functi…
Browse files Browse the repository at this point in the history
…on inside state machine

Add `Raft::external_state_machine_request()` in a fire-and-forget manner,
and `Raft::with_state_machine()` blocks waiting on the response.

If the input StateMachine is a different type from the one in
`RaftCore`, `with_state_machine()` an error, while
`external_state_machine_request()` silently ignores it.

Other changes: move OptionalSerde, OptionalSync, OptionalSync from
`openraft::` to `openraft::base::`
  • Loading branch information
drmingdrmer committed Jul 29, 2024
1 parent 590d943 commit 6c7527f
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 63 deletions.
59 changes: 59 additions & 0 deletions openraft/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Basic types used in the Raft implementation.
pub use serde_able::OptionalSerde;
pub use threaded::BoxAny;
pub use threaded::BoxAsyncOnceMut;
pub use threaded::BoxFuture;
pub use threaded::BoxOnce;
pub use threaded::OptionalSend;
pub use threaded::OptionalSync;

#[cfg(not(feature = "singlethreaded"))]
mod threaded {
use std::any::Any;
use std::future::Future;
use std::pin::Pin;

pub trait OptionalSend: Send {}
impl<T: Send + ?Sized> OptionalSend for T {}

pub trait OptionalSync: Sync {}
impl<T: Sync + ?Sized> OptionalSync for T {}

pub type BoxFuture<'a, T = ()> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub type BoxAsyncOnceMut<'a, A, T = ()> = Box<dyn FnOnce(&mut A) -> BoxFuture<T> + Send + 'a>;
pub type BoxOnce<'a, A, T = ()> = Box<dyn FnOnce(&A) -> T + Send + 'a>;
pub type BoxAny = Box<dyn Any + Send>;
}

#[cfg(feature = "singlethreaded")]
mod threaded {
use std::any::Any;
use std::future::Future;
use std::pin::Pin;

pub trait OptionalSend {}
impl<T: ?Sized> OptionalSend for T {}

pub trait OptionalSync {}
impl<T: ?Sized> OptionalSync for T {}

pub type BoxFuture<'a, T = ()> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub type BoxAsyncOnceMut<'a, A, T = ()> = Box<dyn FnOnce(&mut A) -> BoxFuture<T> + 'a>;
pub type BoxOnce<'a, A, T = ()> = Box<dyn FnOnce(&A) -> T + 'a>;
pub type BoxAny = Box<dyn Any>;
}

#[cfg(not(feature = "serde"))]
mod serde_able {
#[doc(hidden)]
pub trait OptionalSerde {}
impl<T> OptionalSerde for T {}
}

#[cfg(feature = "serde")]
mod serde_able {
#[doc(hidden)]
pub trait OptionalSerde: serde::Serialize + for<'a> serde::Deserialize<'a> {}
impl<T> OptionalSerde for T where T: serde::Serialize + for<'a> serde::Deserialize<'a> {}
}
6 changes: 6 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,12 @@ where
ExternalCommand::PurgeLog { upto } => {
self.engine.trigger_purge_log(upto);
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
let res = self.sm_handle.send(sm_cmd);
if let Err(e) = res {
tracing::error!(error = display(e), "error sending sm::Command to sm::Worker");
}
}
}
}
};
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::fmt;

use crate::core::raft_msg::ResultSender;
use crate::core::sm;
use crate::RaftTypeConfig;
use crate::Snapshot;

Expand Down Expand Up @@ -31,6 +32,10 @@ pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
PurgeLog { upto: u64 },

/// Send a [`sm::Command`] to [`sm::worker::Worker`].
/// This command is run in the sm task.
StateMachineCommand { sm_cmd: sm::Command<C> },
}

impl<C> fmt::Debug for ExternalCommand<C>
Expand Down Expand Up @@ -61,6 +66,9 @@ where C: RaftTypeConfig
ExternalCommand::PurgeLog { upto } => {
write!(f, "PurgeLog[..={}]", upto)
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
write!(f, "StateMachineCommand: {}", sm_cmd)
}
}
}
}
5 changes: 3 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::collections::BTreeMap;
use std::fmt;

use crate::base::BoxOnce;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::CheckIsLeaderError;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
Expand All @@ -16,6 +16,7 @@ use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::ChangeMembers;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::Vote;
Expand Down Expand Up @@ -91,7 +92,7 @@ where C: RaftTypeConfig
},

ExternalCoreRequest {
req: BoxCoreFn<C>,
req: BoxOnce<'static, RaftState<C>>,
},

ExternalCommand {
Expand Down
16 changes: 16 additions & 0 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;

use crate::base::BoxAny;
use crate::core::raft_msg::ResultSender;
use crate::error::Infallible;
use crate::raft_state::IOId;
Expand Down Expand Up @@ -41,6 +42,17 @@ where C: RaftTypeConfig
/// The last log id to apply, inclusive.
last: LogIdOf<C>,
},

/// Apply a custom function to the state machine.
///
/// To erase the type parameter `SM`, it is a
/// `Box<dyn FnOnce(&mut SM) -> Box<dyn Future<Output = ()>> + Send + 'static>`
/// wrapped in a `Box<dyn Any>`
Func {
func: BoxAny,
/// The SM type user specified, for debug purpose.
input_sm_type: &'static str,
},
}

impl<C> Command<C>
Expand Down Expand Up @@ -75,6 +87,7 @@ where C: RaftTypeConfig
Command::BeginReceivingSnapshot { .. } => None,
Command::InstallFullSnapshot { io_id, .. } => Some(*io_id),
Command::Apply { .. } => None,
Command::Func { .. } => None,
}
}
}
Expand All @@ -93,6 +106,7 @@ where C: RaftTypeConfig
write!(f, "BeginReceivingSnapshot")
}
Command::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last),
Command::Func { .. } => write!(f, "Func"),
}
}
}
Expand All @@ -111,6 +125,7 @@ where C: RaftTypeConfig
write!(f, "BeginReceivingSnapshot")
}
Command::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last),
Command::Func { .. } => write!(f, "Func"),
}
}
}
Expand Down Expand Up @@ -141,6 +156,7 @@ where C: RaftTypeConfig
last: last2,
},
) => first == first2 && last == last2,
(Command::Func { .. }, Command::Func { .. }) => false,
_ => false,
}
}
Expand Down
15 changes: 15 additions & 0 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tracing_futures::Instrument;
use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::base::BoxAsyncOnceMut;
use crate::core::notification::Notification;
use crate::core::raft_msg::ResultSender;
use crate::core::sm::handle::Handle;
Expand Down Expand Up @@ -142,6 +143,20 @@ where
let res = CommandResult::new(Ok(Response::Apply(resp)));
let _ = self.resp_tx.send(Notification::sm(res));
}
Command::Func { func, input_sm_type } => {
tracing::debug!("{}: run user defined Func", func_name!());

let res: Result<Box<BoxAsyncOnceMut<'static, SM>>, _> = func.downcast();
if let Ok(f) = res {
f(&mut self.state_machine).await;
} else {
tracing::warn!(
"User-defined SM function uses incorrect state machine type, expected: {}, got: {}",
std::any::type_name::<SM>(),
input_sm_type
);
};
}
};
}
}
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod decompose;
pub mod into_ok;
mod invalid_sm;
mod replication_closed;
mod streaming_error;

Expand All @@ -13,6 +14,7 @@ use std::time::Duration;

use anyerror::AnyError;

pub use self::invalid_sm::InvalidStateMachineType;
pub use self::replication_closed::ReplicationClosed;
pub use self::streaming_error::StreamingError;
use crate::network::RPCTypes;
Expand Down
30 changes: 30 additions & 0 deletions openraft/src/error/invalid_sm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error(
"User-defined function on the state machine failed to run; \
It may have used a different type \
of state machine from the one in RaftCore (`{actual_type}`)"
)]

pub struct InvalidStateMachineType {
pub actual_type: &'static str,
}

impl InvalidStateMachineType {
pub(crate) fn new<SM>() -> Self {
Self {
actual_type: std::any::type_name::<SM>(),
}
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_invalid_state_machine_type_to_string() {
let err = super::InvalidStateMachineType::new::<u32>();
assert_eq!(
err.to_string(),
"User-defined function on the state machine failed to run; It may have used a different type of state machine from the one in RaftCore (`u32`)"
);
}
}
42 changes: 4 additions & 38 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub(crate) mod proposer;
pub(crate) mod raft_state;
pub(crate) mod utime;

pub mod base;
#[cfg(feature = "compat")]
pub mod compat;
pub mod docs;
Expand All @@ -74,6 +75,9 @@ pub use type_config::async_runtime;
pub use type_config::async_runtime::impls::TokioRuntime;
pub use type_config::AsyncRuntime;

pub use crate::base::OptionalSend;
pub use crate::base::OptionalSerde;
pub use crate::base::OptionalSync;
pub use crate::change_members::ChangeMembers;
pub use crate::config::Config;
pub use crate::config::ConfigError;
Expand Down Expand Up @@ -124,44 +128,6 @@ pub use crate::vote::CommittedLeaderId;
pub use crate::vote::LeaderId;
pub use crate::vote::Vote;

#[cfg(feature = "serde")]
#[doc(hidden)]
pub trait OptionalSerde: serde::Serialize + for<'a> serde::Deserialize<'a> {}

#[cfg(feature = "serde")]
impl<T> OptionalSerde for T where T: serde::Serialize + for<'a> serde::Deserialize<'a> {}

#[cfg(not(feature = "serde"))]
#[doc(hidden)]
pub trait OptionalSerde {}

#[cfg(not(feature = "serde"))]
impl<T> OptionalSerde for T {}

#[cfg(feature = "singlethreaded")]
pub trait OptionalSend {}

#[cfg(feature = "singlethreaded")]
pub trait OptionalSync {}

#[cfg(feature = "singlethreaded")]
impl<T: ?Sized> OptionalSend for T {}

#[cfg(feature = "singlethreaded")]
impl<T: ?Sized> OptionalSync for T {}

#[cfg(not(feature = "singlethreaded"))]
pub trait OptionalSend: Send {}

#[cfg(not(feature = "singlethreaded"))]
pub trait OptionalSync: Sync {}

#[cfg(not(feature = "singlethreaded"))]
impl<T: Send + ?Sized> OptionalSend for T {}

#[cfg(not(feature = "singlethreaded"))]
impl<T: Sync + ?Sized> OptionalSync for T {}

/// A trait defining application specific data.
///
/// The intention of this trait is that applications which are using this crate will be able to
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/raft/core_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ where C: RaftTypeConfig
/// The RaftCore task has finished. The return value of the task is stored.
Done(Result<Infallible, Fatal<C>>),
}

impl<C> CoreState<C>
where C: RaftTypeConfig
{
/// Returns `true` if the RaftCore task is still running.
pub(in crate::raft) fn is_running(&self) -> bool {
matches!(self, CoreState::Running(_))
}
}
15 changes: 0 additions & 15 deletions openraft/src/raft/external_request.rs

This file was deleted.

Loading

0 comments on commit 6c7527f

Please sign in to comment.