Skip to content

Commit

Permalink
Change: refine the RaftEntry trait
Browse files Browse the repository at this point in the history
- The `RaftEntry` trait now requires `AsRef<LogIdOf<C>>` and
  `AsMut<LogIdOf<C>>`, providing a more standard API for accessing the
  log ID of a log entry. As a result, the `RaftEntry: RaftLogId`
  requirement is no longer needed.

- A new method, `new_normal()`, has been added to the `RaftEntry` trait
  to replace the `FromAppData` trait.

- Additional utility methods for working with entries are now provided
  in the `RaftEntryExt` trait.

- Part of #1278

Upgrade tips:

1. **For applications using a custom `RaftEntry` implementation** (e.g.,
   declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`):
   - Update the `RaftEntry` implementation for your custom entry type
     (`MyEntry`) by adding the `new_normal()` method.
   - Implement `AsRef<LogId>` and `AsMut<LogId>` for your custom entry
     type.

2. **For applications using the default `Entry` provided by OpenRaft**:
   - No changes are required.
  • Loading branch information
drmingdrmer committed Jan 8, 2025
1 parent d4b4b02 commit 2abaffb
Show file tree
Hide file tree
Showing 22 changed files with 152 additions and 107 deletions.
6 changes: 3 additions & 3 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::alias::VoteOf;
use openraft::entry::RaftEntryExt;
use openraft::storage::IOFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone());
let last = self.log.iter().next_back().map(|(_, ent)| ent.to_log_id());

let last_purged = self.last_purged_log_id.clone();

Expand Down Expand Up @@ -97,7 +97,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index, entry);
self.log.insert(entry.to_log_id().index, entry);
}
callback.io_completed(Ok(()));

Expand Down
4 changes: 2 additions & 2 deletions examples/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use std::sync::Arc;

use log_store::RocksLogStore;
use openraft::alias::SnapshotDataOf;
use openraft::entry::RaftEntryExt;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogId;
use openraft::RaftSnapshotBuilder;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -179,7 +179,7 @@ impl RaftStateMachine<TypeConfig> for RocksStateMachine {
for entry in entries_iter {
tracing::debug!(%entry.log_id, "replicate to sm");

sm.last_applied_log = Some(*entry.get_log_id());
sm.last_applied_log = Some(entry.to_log_id());

match entry.payload {
EntryPayload::Blank => res.push(RocksResponse { value: None }),
Expand Down
10 changes: 5 additions & 5 deletions examples/rocksstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use meta::StoreMeta;
use openraft::alias::EntryOf;
use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::entry::RaftEntryExt;
use openraft::storage::IOFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogState;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftTypeConfig;
use openraft::StorageError;
Expand Down Expand Up @@ -103,7 +103,7 @@ where C: RaftTypeConfig

let entry: EntryOf<C> = serde_json::from_slice(&val).map_err(read_logs_err)?;

assert_eq!(id, entry.get_log_id().index);
assert_eq!(id, entry.index());

res.push(entry);
}
Expand All @@ -128,7 +128,7 @@ where C: RaftTypeConfig
Some(res) => {
let (_log_index, entry_bytes) = res.map_err(read_logs_err)?;
let ent = serde_json::from_slice::<EntryOf<C>>(&entry_bytes).map_err(read_logs_err)?;
Some(ent.get_log_id().clone())
Some(ent.to_log_id())
}
};

Expand Down Expand Up @@ -158,8 +158,8 @@ where C: RaftTypeConfig
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = EntryOf<C>> + Send {
for entry in entries {
let id = id_to_bin(entry.get_log_id().index);
assert_eq!(bin_to_id(&id), entry.get_log_id().index);
let id = id_to_bin(entry.index());
assert_eq!(bin_to_id(&id), entry.index());
self.db
.put_cf(
self.cf_logs(),
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use crate::engine::Condition;
use crate::engine::Engine;
use crate::engine::ReplicationProgress;
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::entry::RaftEntryExt;
use crate::error::AllowNextRevertError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
Expand All @@ -55,7 +55,6 @@ use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
Expand Down Expand Up @@ -1246,7 +1245,7 @@ where
self.handle_check_is_leader_request(tx).await;
}
RaftMsg::ClientWriteRequest { app_data, tx } => {
self.write_entry(C::Entry::from_app_data(app_data), Some(tx));
self.write_entry(C::Entry::new_normal(Default::default(), app_data), Some(tx));
}
RaftMsg::Initialize { members, tx } => {
tracing::info!(
Expand Down Expand Up @@ -1746,7 +1745,7 @@ where
committed_vote: vote,
entries,
} => {
let last_log_id = entries.last().unwrap().get_log_id();
let last_log_id = entries.last().unwrap().log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

let io_id = IOId::new_log_io(vote, Some(last_log_id.clone()));
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::core::ApplyResult;
use crate::core::ApplyingEntry;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySliceExt;
use crate::entry::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::storage::Snapshot;
Expand All @@ -23,7 +24,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
#[allow(clippy::needless_collect)]
let applying_entries = entries
.iter()
.map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership().cloned()))
.map(|e| ApplyingEntry::new(e.to_log_id(), e.get_membership().cloned()))
.collect::<Vec<_>>();

let n_entries = end - since;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::engine::Condition;
use crate::engine::EngineOutput;
use crate::engine::Respond;
use crate::entry::RaftEntry;
use crate::entry::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::error::ForwardToLeader;
use crate::error::Infallible;
Expand Down Expand Up @@ -57,7 +58,6 @@ use crate::vote::RaftVote;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;

/// Raft protocol algorithm.
Expand Down
17 changes: 7 additions & 10 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::error::RejectAppendEntries;
use crate::raft_state::IOId;
Expand All @@ -20,7 +21,6 @@ use crate::vote::committed::CommittedVote;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::StoredMembership;
Expand Down Expand Up @@ -69,10 +69,10 @@ where C: RaftTypeConfig
);

if let Some(x) = entries.first() {
debug_assert!(x.get_log_id().index == prev_log_id.next_index());
debug_assert!(x.index() == prev_log_id.next_index());
}

let last_log_id = entries.last().map(|x| x.get_log_id().clone());
let last_log_id = entries.last().map(|x| x.to_log_id());
let last_log_id = std::cmp::max(prev_log_id, last_log_id);

let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
Expand All @@ -84,7 +84,7 @@ where C: RaftTypeConfig
// the entries after it has to be deleted first.
// Raft requires log ids are in total order by (term,index).
// Otherwise the log id with max index makes committed entry invisible in election.
self.truncate_logs(entries[since].get_log_id().index);
self.truncate_logs(entries[since].index());

let entries = entries.split_off(since);
self.do_append_entries(entries);
Expand Down Expand Up @@ -143,11 +143,8 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn do_append_entries(&mut self, entries: Vec<C::Entry>) {
debug_assert!(!entries.is_empty());
debug_assert_eq!(
entries[0].get_log_id().index,
self.state.log_ids.last().cloned().next_index(),
);
debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last());
debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),);
debug_assert!(Some(entries[0].log_id()) > self.state.log_ids.last());

self.state.extend_log_ids(&entries);
self.append_membership(entries.iter());
Expand Down Expand Up @@ -343,7 +340,7 @@ where C: RaftTypeConfig
// Find the last 2 membership config entries: the committed and the effective.
for ent in entries.rev() {
if let Some(m) = ent.get_membership() {
memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m.clone()));
memberships.insert(0, StoredMembership::new(Some(ent.to_log_id()), m.clone()));
if memberships.len() == 2 {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
Expand All @@ -10,7 +11,6 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::LogIdOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -67,7 +67,7 @@ where C: RaftTypeConfig
membership_entry.is_none(),
"only one membership entry is allowed in a batch"
);
membership_entry = Some((entry.get_log_id().clone(), m.clone()));
membership_entry = Some((entry.to_log_id(), m.clone()));
}
}

Expand Down
12 changes: 6 additions & 6 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ where C: RaftTypeConfig
/// Extends a list of `log_id` that are proposed by a same leader.
///
/// The log ids in the input has to be continuous.
pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId<C> + 'a>(&mut self, new_ids: &[LID]) {
pub(crate) fn extend_from_same_leader<'a, LID: AsRef<LogIdOf<C>> + 'a>(&mut self, new_ids: &[LID]) {
if let Some(first) = new_ids.first() {
let first_id = first.get_log_id();
let first_id = first.as_ref();
self.append(first_id.clone());

if let Some(last) = new_ids.last() {
let last_id = last.get_log_id();
let last_id = last.as_ref();
assert_eq!(last_id.leader_id, first_id.leader_id);

if last_id != first_id {
Expand All @@ -150,11 +150,11 @@ where C: RaftTypeConfig
/// Extends a list of `log_id`.
// leader_id: Copy is feature gated
#[allow(clippy::clone_on_copy)]
pub(crate) fn extend<'a, LID: RaftLogId<C> + 'a>(&mut self, new_ids: &[LID]) {
pub(crate) fn extend<'a, LID: AsRef<LogIdOf<C>> + 'a>(&mut self, new_ids: &[LID]) {
let mut prev = self.last().map(|x| x.leader_id.clone());

for x in new_ids.iter() {
let log_id = x.get_log_id();
let log_id = x.as_ref();

if prev.as_ref() != Some(&log_id.leader_id) {
self.append(log_id.clone());
Expand All @@ -164,7 +164,7 @@ where C: RaftTypeConfig
}

if let Some(last) = new_ids.last() {
let log_id = last.get_log_id();
let log_id = last.as_ref();

if self.last() != Some(log_id) {
self.append(log_id.clone());
Expand Down
38 changes: 20 additions & 18 deletions openraft/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::fmt;
use std::fmt::Debug;

use crate::log_id::RaftLogId;
use crate::LogId;
use crate::Membership;
use crate::RaftTypeConfig;
Expand All @@ -12,10 +11,13 @@ pub mod payload;
mod traits;

pub use payload::EntryPayload;
pub use traits::FromAppData;
pub use traits::RaftEntry;
pub use traits::RaftEntryExt;
pub use traits::RaftPayload;

use crate::type_config::alias::AppDataOf;
use crate::type_config::alias::LogIdOf;

/// A Raft log entry.
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct Entry<C>
Expand Down Expand Up @@ -97,43 +99,43 @@ where C: RaftTypeConfig
}
}

impl<C> RaftLogId<C> for Entry<C>
impl<C> AsRef<LogIdOf<C>> for Entry<C>
where C: RaftTypeConfig
{
fn get_log_id(&self) -> &LogId<C> {
fn as_ref(&self) -> &LogIdOf<C> {
&self.log_id
}
}

fn set_log_id(&mut self, log_id: &LogId<C>) {
self.log_id = log_id.clone();
impl<C> AsMut<LogIdOf<C>> for Entry<C>
where C: RaftTypeConfig
{
fn as_mut(&mut self) -> &mut LogIdOf<C> {
&mut self.log_id
}
}

impl<C> RaftEntry<C> for Entry<C>
where C: RaftTypeConfig
{
fn new_blank(log_id: LogId<C>) -> Self {
fn new_blank(log_id: LogIdOf<C>) -> Self {
Self {
log_id,
payload: EntryPayload::Blank,
}
}

fn new_membership(log_id: LogId<C>, m: Membership<C>) -> Self {
Self {
fn new_normal(log_id: LogIdOf<C>, data: AppDataOf<C>) -> Self {
Entry {
log_id,
payload: EntryPayload::Membership(m),
payload: EntryPayload::Normal(data),
}
}
}

impl<C> FromAppData<C::D> for Entry<C>
where C: RaftTypeConfig
{
fn from_app_data(d: C::D) -> Self {
Entry {
log_id: LogId::default(),
payload: EntryPayload::Normal(d),
fn new_membership(log_id: LogIdOf<C>, m: Membership<C>) -> Self {
Self {
log_id,
payload: EntryPayload::Membership(m),
}
}
}
Loading

0 comments on commit 2abaffb

Please sign in to comment.