diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index a9e949c37..c61a4ede6 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use openraft::alias::LogIdOf; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntryExt; use openraft::storage::IOFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogReader; @@ -20,7 +21,6 @@ use openraft::storage::Snapshot; use openraft::Entry; use openraft::EntryPayload; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; @@ -188,7 +188,7 @@ impl RaftLogStorage for Arc { let last = match last_serialized { None => None, - Some(ent) => Some(*ent.get_log_id()), + Some(ent) => Some(ent.to_log_id()), }; let last_purged = self.last_purged_log_id.read().await.clone(); @@ -237,7 +237,7 @@ impl RaftLogStorage for Arc { where I: IntoIterator> + Send { { let mut log = self.log.write().await; - log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index(), entry))); + log.extend(entries.into_iter().map(|entry| (entry.index(), entry))); } callback.io_completed(Ok(())); Ok(()) diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 05e548ec8..470a25528 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -8,9 +8,9 @@ use std::sync::Arc; use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntryExt; use openraft::storage::IOFlushed; use openraft::LogState; -use openraft::RaftLogId; use openraft::RaftTypeConfig; use openraft::StorageError; use tokio::sync::Mutex; @@ -60,7 +60,7 @@ impl LogStoreInner { } async fn get_log_state(&mut self) -> Result, StorageError> { - let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone()); + let last = self.log.iter().next_back().map(|(_, ent)| ent.to_log_id()); let last_purged = self.last_purged_log_id.clone(); @@ -97,7 +97,7 @@ impl LogStoreInner { where I: IntoIterator { // Simple implementation that calls the flush-before-return `append_to_log`. for entry in entries { - self.log.insert(entry.get_log_id().index(), entry); + self.log.insert(entry.to_log_id().index, entry); } callback.io_completed(Ok(())); diff --git a/examples/rocksstore/src/lib.rs b/examples/rocksstore/src/lib.rs index 186543c19..541ad46bb 100644 --- a/examples/rocksstore/src/lib.rs +++ b/examples/rocksstore/src/lib.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use log_store::RocksLogStore; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntryExt; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; -use openraft::RaftLogId; use openraft::RaftSnapshotBuilder; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; @@ -179,7 +179,7 @@ impl RaftStateMachine for RocksStateMachine { for entry in entries_iter { tracing::debug!(%entry.log_id, "replicate to sm"); - sm.last_applied_log = Some(*entry.get_log_id()); + sm.last_applied_log = Some(entry.to_log_id()); match entry.payload { EntryPayload::Blank => res.push(RocksResponse { value: None }), diff --git a/examples/rocksstore/src/log_store.rs b/examples/rocksstore/src/log_store.rs index 4d7b5ea95..506adecde 100644 --- a/examples/rocksstore/src/log_store.rs +++ b/examples/rocksstore/src/log_store.rs @@ -11,11 +11,11 @@ use meta::StoreMeta; use openraft::alias::EntryOf; use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntryExt; use openraft::storage::IOFlushed; use openraft::storage::RaftLogStorage; use openraft::LogState; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftTypeConfig; use openraft::StorageError; @@ -103,7 +103,7 @@ where C: RaftTypeConfig let entry: EntryOf = serde_json::from_slice(&val).map_err(read_logs_err)?; - assert_eq!(id, entry.get_log_id().index()); + assert_eq!(id, entry.index()); res.push(entry); } @@ -128,7 +128,7 @@ where C: RaftTypeConfig Some(res) => { let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - Some(ent.get_log_id().clone()) + Some(ent.to_log_id()) } }; @@ -158,8 +158,8 @@ where C: RaftTypeConfig async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), StorageError> where I: IntoIterator> + Send { for entry in entries { - let id = id_to_bin(entry.get_log_id().index()); - assert_eq!(bin_to_id(&id), entry.get_log_id().index()); + let id = id_to_bin(entry.index()); + assert_eq!(bin_to_id(&id), entry.index()); self.db .put_cf( self.cf_logs(), diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c0528f0b0..00b0a50f5 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -43,8 +43,8 @@ use crate::engine::Condition; use crate::engine::Engine; use crate::engine::ReplicationProgress; use crate::engine::Respond; -use crate::entry::FromAppData; use crate::entry::RaftEntry; +use crate::entry::RaftEntryExt; use crate::error::AllowNextRevertError; use crate::error::ClientWriteError; use crate::error::Fatal; @@ -55,7 +55,6 @@ use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; use crate::log_id::LogIdOptionExt; -use crate::log_id::RaftLogId; use crate::metrics::HeartbeatMetrics; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; @@ -1246,7 +1245,7 @@ where self.handle_check_is_leader_request(tx).await; } RaftMsg::ClientWriteRequest { app_data, tx } => { - self.write_entry(C::Entry::from_app_data(app_data), Some(tx)); + self.write_entry(C::Entry::new_normal(Default::default(), app_data), Some(tx)); } RaftMsg::Initialize { members, tx } => { tracing::info!( @@ -1746,7 +1745,7 @@ where committed_vote: vote, entries, } => { - let last_log_id = entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); let io_id = IOId::new_log_io(vote, Some(last_log_id.clone())); diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index 750621e26..75f968815 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -15,6 +15,7 @@ use crate::core::ApplyResult; use crate::core::ApplyingEntry; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplaySliceExt; +use crate::entry::RaftEntryExt; use crate::entry::RaftPayload; use crate::storage::RaftStateMachine; use crate::storage::Snapshot; @@ -23,7 +24,6 @@ use crate::type_config::alias::LogIdOf; use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::TypeConfigExt; -use crate::RaftLogId; use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; @@ -186,7 +186,7 @@ where #[allow(clippy::needless_collect)] let applying_entries = entries .iter() - .map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership().cloned())) + .map(|e| ApplyingEntry::new(e.to_log_id(), e.get_membership().cloned())) .collect::>(); let n_entries = end - since; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 9d5a0622f..957047510 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -23,6 +23,7 @@ use crate::engine::Condition; use crate::engine::EngineOutput; use crate::engine::Respond; use crate::entry::RaftEntry; +use crate::entry::RaftEntryExt; use crate::entry::RaftPayload; use crate::error::ForwardToLeader; use crate::error::Infallible; @@ -57,7 +58,6 @@ use crate::vote::RaftTerm; use crate::vote::RaftVote; use crate::LogIdOptionExt; use crate::Membership; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Raft protocol algorithm. diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 245a1eeff..85b9999c5 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -11,6 +11,7 @@ use crate::engine::Command; use crate::engine::Condition; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::RaftEntryExt; use crate::entry::RaftPayload; use crate::error::RejectAppendEntries; use crate::raft_state::IOId; @@ -20,7 +21,6 @@ use crate::type_config::alias::LogIdOf; use crate::vote::committed::CommittedVote; use crate::EffectiveMembership; use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; use crate::StoredMembership; @@ -69,10 +69,10 @@ where C: RaftTypeConfig ); if let Some(x) = entries.first() { - debug_assert!(x.get_log_id().index() == prev_log_id.next_index()); + debug_assert!(x.index() == prev_log_id.next_index()); } - let last_log_id = entries.last().map(|x| x.get_log_id().clone()); + let last_log_id = entries.last().map(|x| x.to_log_id()); let last_log_id = std::cmp::max(prev_log_id, last_log_id); let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone())); @@ -84,7 +84,7 @@ where C: RaftTypeConfig // the entries after it has to be deleted first. // Raft requires log ids are in total order by (term,index). // Otherwise the log id with max index makes committed entry invisible in election. - self.truncate_logs(entries[since].get_log_id().index()); + self.truncate_logs(entries[since].index()); let entries = entries.split_off(since); self.do_append_entries(entries); @@ -143,11 +143,8 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self, entries))] pub(crate) fn do_append_entries(&mut self, entries: Vec) { debug_assert!(!entries.is_empty()); - debug_assert_eq!( - entries[0].get_log_id().index(), - self.state.log_ids.last().cloned().next_index(), - ); - debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last()); + debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),); + debug_assert!(Some(entries[0].log_id()) > self.state.log_ids.last()); self.state.extend_log_ids(&entries); self.append_membership(entries.iter()); @@ -343,7 +340,7 @@ where C: RaftTypeConfig // Find the last 2 membership config entries: the committed and the effective. for ent in entries.rev() { if let Some(m) = ent.get_membership() { - memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m.clone())); + memberships.insert(0, StoredMembership::new(Some(ent.to_log_id()), m.clone())); if memberships.len() == 2 { break; } diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 85563ebe4..4c4018407 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -2,6 +2,7 @@ use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::RaftEntryExt; use crate::entry::RaftPayload; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; @@ -10,7 +11,6 @@ use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::replication::ReplicationSessionId; use crate::type_config::alias::LogIdOf; -use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; @@ -67,7 +67,7 @@ where C: RaftTypeConfig membership_entry.is_none(), "only one membership entry is allowed in a batch" ); - membership_entry = Some((entry.get_log_id().clone(), m.clone())); + membership_entry = Some((entry.to_log_id(), m.clone())); } } diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 01bd2bb32..5b601018e 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -131,13 +131,13 @@ where C: RaftTypeConfig /// Extends a list of `log_id` that are proposed by a same leader. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { + pub(crate) fn extend_from_same_leader<'a, LID: AsRef> + 'a>(&mut self, new_ids: &[LID]) { if let Some(first) = new_ids.first() { - let first_id = first.get_log_id(); + let first_id = first.as_ref(); self.append(first_id.clone()); if let Some(last) = new_ids.last() { - let last_id = last.get_log_id(); + let last_id = last.as_ref(); assert_eq!(last_id.leader_id(), first_id.leader_id()); if last_id != first_id { @@ -150,11 +150,11 @@ where C: RaftTypeConfig /// Extends a list of `log_id`. // leader_id: Copy is feature gated #[allow(clippy::clone_on_copy)] - pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { + pub(crate) fn extend<'a, LID: AsRef> + 'a>(&mut self, new_ids: &[LID]) { let mut prev = self.last().map(|x| x.leader_id().clone()); for x in new_ids.iter() { - let log_id = x.get_log_id(); + let log_id = x.as_ref(); if prev.as_ref() != Some(log_id.leader_id()) { self.append(log_id.clone()); @@ -164,7 +164,7 @@ where C: RaftTypeConfig } if let Some(last) = new_ids.last() { - let log_id = last.get_log_id(); + let log_id = last.as_ref(); if self.last() != Some(log_id) { self.append(log_id.clone()); diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index 812540621..0c482ebe5 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -3,7 +3,6 @@ use std::fmt; use std::fmt::Debug; -use crate::log_id::RaftLogId; use crate::Membership; use crate::RaftTypeConfig; @@ -11,10 +10,11 @@ pub mod payload; mod traits; pub use payload::EntryPayload; -pub use traits::FromAppData; pub use traits::RaftEntry; +pub use traits::RaftEntryExt; pub use traits::RaftPayload; +use crate::type_config::alias::AppDataOf; use crate::type_config::alias::LogIdOf; /// A Raft log entry. @@ -98,15 +98,19 @@ where C: RaftTypeConfig } } -impl RaftLogId for Entry +impl AsRef> for Entry where C: RaftTypeConfig { - fn get_log_id(&self) -> &LogIdOf { + fn as_ref(&self) -> &LogIdOf { &self.log_id } +} - fn set_log_id(&mut self, log_id: &LogIdOf) { - self.log_id = log_id.clone(); +impl AsMut> for Entry +where C: RaftTypeConfig +{ + fn as_mut(&mut self) -> &mut LogIdOf { + &mut self.log_id } } @@ -120,21 +124,17 @@ where C: RaftTypeConfig } } - fn new_membership(log_id: LogIdOf, m: Membership) -> Self { - Self { + fn new_normal(log_id: LogIdOf, data: AppDataOf) -> Self { + Entry { log_id, - payload: EntryPayload::Membership(m), + payload: EntryPayload::Normal(data), } } -} -impl FromAppData for Entry -where C: RaftTypeConfig -{ - fn from_app_data(d: C::D) -> Self { - Entry { - log_id: LogIdOf::::default(), - payload: EntryPayload::Normal(d), + fn new_membership(log_id: LogIdOf, m: Membership) -> Self { + Self { + log_id, + payload: EntryPayload::Membership(m), } } } diff --git a/openraft/src/entry/traits.rs b/openraft/src/entry/traits.rs index 36ca96b5a..964609108 100644 --- a/openraft/src/entry/traits.rs +++ b/openraft/src/entry/traits.rs @@ -3,6 +3,7 @@ use std::fmt::Display; use crate::base::OptionalFeatures; use crate::log_id::RaftLogId; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; use crate::Membership; use crate::RaftTypeConfig; @@ -23,23 +24,54 @@ pub trait RaftEntry where C: RaftTypeConfig, Self: OptionalFeatures + Debug + Display, - Self: RaftPayload + RaftLogId, + Self: RaftPayload + AsRef> + AsMut>, { /// Create a new blank log entry. /// /// The returned instance must return `true` for `Self::is_blank()`. fn new_blank(log_id: LogIdOf) -> Self; + /// Create a new normal log entry that contains application data. + fn new_normal(log_id: LogIdOf, data: C::D) -> Self; + /// Create a new membership log entry. /// /// The returned instance must return `Some()` for `Self::get_membership()`. fn new_membership(log_id: LogIdOf, m: Membership) -> Self; } -/// Build a raft log entry from app data. -/// -/// A concrete Entry should implement this trait to let openraft create an entry when needed. -pub trait FromAppData { - /// Build a raft log entry from app data. - fn from_app_data(t: T) -> Self; +pub trait RaftEntryExt: RaftEntry +where C: RaftTypeConfig +{ + fn log_id(&self) -> &LogIdOf { + AsRef::>::as_ref(self) + } + + fn to_log_id(&self) -> LogIdOf { + self.log_id().clone() + } + + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + AsRef::>::as_ref(self).leader_id() + } + + fn to_committed_leader_id(&self) -> CommittedLeaderIdOf { + self.committed_leader_id().clone() + } + + fn index(&self) -> u64 { + AsRef::>::as_ref(self).index + } + + fn set_log_id(&mut self, new: &LogIdOf) { + let log_id = AsMut::>::as_mut(self); + *log_id = new.clone(); + } +} + +impl RaftEntryExt for T +where + C: RaftTypeConfig, + T: RaftEntry, +{ } diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index b390edbb5..a736302df 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -62,6 +62,22 @@ where C: RaftTypeConfig } } +impl AsRef> for LogId +where C: RaftTypeConfig +{ + fn as_ref(&self) -> &LogId { + self + } +} + +impl AsMut> for LogId +where C: RaftTypeConfig +{ + fn as_mut(&mut self) -> &mut LogId { + self + } +} + impl LogId where C: RaftTypeConfig { diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 1c43aa304..228340b00 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -12,7 +12,6 @@ use crate::type_config::TypeConfigExt; use crate::vote::committed::CommittedVote; use crate::vote::raft_vote::RaftVoteExt; use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Leading state data. @@ -162,7 +161,7 @@ where /// Assign log ids to the entries. /// /// This method update the `self.last_log_id`. - pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( + pub(crate) fn assign_log_ids<'a, LID: AsMut> + 'a>( &mut self, entries: impl IntoIterator, ) { @@ -174,7 +173,7 @@ where let mut last = first.clone(); for entry in entries { - entry.set_log_id(&last); + *entry.as_mut() = last.clone(); tracing::debug!("assign log id: {}", last); last.index += 1; } @@ -228,13 +227,13 @@ mod tests { use crate::engine::testing::log_id; use crate::engine::testing::UTConfig; use crate::entry::RaftEntry; + use crate::entry::RaftEntryExt; use crate::progress::Progress; use crate::proposer::Leader; use crate::testing::blank_ent; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; use crate::Entry; - use crate::RaftLogId; use crate::Vote; #[test] @@ -297,7 +296,7 @@ mod tests { leader.assign_log_ids(&mut entries); assert_eq!( - entries[0].get_log_id(), + entries[0].log_id(), &log_id(2, 2, 4), "entry log id assigned following last-log-id" ); @@ -312,7 +311,7 @@ mod tests { let mut entries: Vec> = vec![blank_ent(1, 1, 1)]; leading.assign_log_ids(&mut entries); - assert_eq!(entries[0].get_log_id(), &log_id(0, 0, 0),); + assert_eq!(entries[0].log_id(), &log_id(0, 0, 0),); assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id); } @@ -336,9 +335,9 @@ mod tests { let mut entries: Vec> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)]; leading.assign_log_ids(&mut entries); - assert_eq!(entries[0].get_log_id(), &log_id(2, 2, 9)); - assert_eq!(entries[1].get_log_id(), &log_id(2, 2, 10)); - assert_eq!(entries[2].get_log_id(), &log_id(2, 2, 11)); + assert_eq!(entries[0].log_id(), &log_id(2, 2, 9)); + assert_eq!(entries[1].log_id(), &log_id(2, 2, 10)); + assert_eq!(entries[2].log_id(), &log_id(2, 2, 11)); assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id); } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 7cb650cd4..2f301921d 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -6,7 +6,6 @@ use validit::Validate; use crate::engine::LogIdList; use crate::error::ForwardToLeader; -use crate::log_id::RaftLogId; use crate::storage::SnapshotMeta; use crate::utime::Leased; use crate::LogIdOptionExt; @@ -256,11 +255,11 @@ where C: RaftTypeConfig /// Append a list of `log_id`. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_ids: &[LID]) { + pub(crate) fn extend_log_ids_from_same_leader<'a, LID: AsRef> + 'a>(&mut self, new_log_ids: &[LID]) { self.log_ids.extend_from_same_leader(new_log_ids) } - pub(crate) fn extend_log_ids<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { + pub(crate) fn extend_log_ids<'a, LID: AsRef> + 'a>(&mut self, new_log_id: &[LID]) { self.log_ids.extend(new_log_id) } @@ -301,11 +300,11 @@ where C: RaftTypeConfig /// Find the first entry in the input that does not exist on local raft-log, /// by comparing the log id. pub(crate) fn first_conflicting_index(&self, entries: &[Ent]) -> usize - where Ent: RaftLogId { + where Ent: AsRef> { let l = entries.len(); for (i, ent) in entries.iter().enumerate() { - let log_id = ent.get_log_id(); + let log_id = ent.as_ref(); if !self.has_log_id(log_id) { tracing::debug!( diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 5ea47efae..575d4a2fa 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -27,6 +27,7 @@ use crate::core::notification::Notification; use crate::core::sm::handle::SnapshotReader; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; +use crate::entry::RaftEntryExt; use crate::error::HigherVote; use crate::error::PayloadTooLarge; use crate::error::RPCError; @@ -59,7 +60,6 @@ use crate::type_config::alias::VoteOf; use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; -use crate::RaftLogId; use crate::RaftNetworkFactory; use crate::RaftTypeConfig; use crate::StorageError; @@ -398,8 +398,8 @@ where // limited_get_log_entries will return logs smaller than the range [start, end). let logs = self.log_reader.limited_get_log_entries(start, end).await?; - let first = logs.first().map(|x| x.get_log_id()).unwrap(); - let last = logs.last().map(|x| x.get_log_id().clone()).unwrap(); + let first = logs.first().map(|x| x.log_id()).unwrap(); + let last = logs.last().map(|x| x.to_log_id()).unwrap(); debug_assert!( !logs.is_empty() && logs.len() <= (end - start) as usize, diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 2547ccccd..06fb2580d 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -7,6 +7,7 @@ use validit::Valid; use crate::display_ext::DisplayOptionExt; use crate::engine::LogIdList; +use crate::entry::RaftEntryExt; use crate::entry::RaftPayload; use crate::log_id::RaftLogId; use crate::raft_state::IOState; @@ -193,8 +194,8 @@ where let chunk_end = std::cmp::min(end, start + chunk_size); let entries = log_reader.try_get_log_entries(start..chunk_end).await?; - let first = entries.first().map(|x| x.get_log_id().index()); - let last = entries.last().map(|x| x.get_log_id().index()); + let first = entries.first().map(|x| x.index()); + let last = entries.last().map(|x| x.index()); let make_err = || { let err = AnyError::error(format!( @@ -299,7 +300,7 @@ where for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { - let em = StoredMembership::new(Some(ent.get_log_id().clone()), mem.clone()); + let em = StoredMembership::new(Some(ent.to_log_id()), mem.clone()); res.insert(0, em); if res.len() == 2 { return Ok(res); diff --git a/openraft/src/storage/log_reader_ext.rs b/openraft/src/storage/log_reader_ext.rs index 9bc9645e1..a205e74b5 100644 --- a/openraft/src/storage/log_reader_ext.rs +++ b/openraft/src/storage/log_reader_ext.rs @@ -1,8 +1,8 @@ use anyerror::AnyError; use openraft_macros::add_async_trait; +use crate::entry::RaftEntryExt; use crate::type_config::alias::LogIdOf; -use crate::RaftLogId; use crate::RaftLogReader; use crate::RaftTypeConfig; use crate::StorageError; @@ -30,7 +30,7 @@ where C: RaftTypeConfig )); } - Ok(entries[0].get_log_id().clone()) + Ok(entries[0].to_log_id()) } } diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index c13d4cd07..56f280980 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -3,7 +3,7 @@ use openraft_macros::add_async_trait; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; -use crate::log_id::RaftLogId; +use crate::entry::RaftEntryExt; use crate::raft_state::io_state::io_id::IOId; use crate::storage::IOFlushed; use crate::storage::RaftLogStorage; @@ -31,7 +31,7 @@ where C: RaftTypeConfig { let entries = entries.into_iter().collect::>(); - let last_log_id = entries.last().unwrap().get_log_id().clone(); + let last_log_id = entries.last().unwrap().to_log_id(); let (tx, mut rx) = C::mpsc_unbounded(); diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index e4170f071..808872377 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -11,7 +11,7 @@ use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; use crate::entry::RaftEntry; -use crate::log_id::RaftLogId; +use crate::entry::RaftEntryExt; use crate::membership::EffectiveMembership; use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::LogStateReader; @@ -845,8 +845,8 @@ where let logs = store.try_get_log_entries(5..7).await?; assert_eq!(logs.len(), 2); - assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5)); - assert_eq!(*logs[1].get_log_id(), log_id_0(1, 6)); + assert_eq!(logs[0].to_log_id(), log_id_0(1, 5)); + assert_eq!(logs[1].to_log_id(), log_id_0(1, 6)); } Ok(()) @@ -867,7 +867,7 @@ where assert!(!logs.is_empty()); assert!(logs.len() <= 2); - assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5)); + assert_eq!(logs[0].to_log_id(), log_id_0(1, 5)); } Ok(()) @@ -883,13 +883,13 @@ where C::sleep(Duration::from_millis(1_000)).await; let ent = store.try_get_log_entry(3).await?; - assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.get_log_id().clone())); + assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.to_log_id())); let ent = store.try_get_log_entry(0).await?; - assert_eq!(None, ent.map(|x| x.get_log_id().clone())); + assert_eq!(None, ent.map(|x| x.to_log_id())); let ent = store.try_get_log_entry(11).await?; - assert_eq!(None, ent.map(|x| x.get_log_id().clone())); + assert_eq!(None, ent.map(|x| x.to_log_id())); Ok(()) } @@ -1072,7 +1072,7 @@ where let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 10); - assert_eq!(logs[0].get_log_id().index(), 1); + assert_eq!(logs[0].index(), 1); assert_eq!( LogState { @@ -1097,7 +1097,7 @@ where let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 5); - assert_eq!(logs[0].get_log_id().index(), 6); + assert_eq!(logs[0].index(), 6); assert_eq!( LogState { @@ -1189,7 +1189,7 @@ where let last = store.try_get_log_entries(0..).await?.into_iter().last().unwrap(); assert_eq!(l, 11, "expected 11 entries to exist in the log"); - assert_eq!(*last.get_log_id(), log_id_0(2, 11), "unexpected log id"); + assert_eq!(last.to_log_id(), log_id_0(2, 11), "unexpected log id"); Ok(()) } @@ -1441,7 +1441,7 @@ where { let entries = entries.into_iter().collect::>(); - let last_log_id = entries.last().unwrap().get_log_id().clone(); + let last_log_id = entries.last().unwrap().to_log_id(); let (tx, mut rx) = C::mpsc_unbounded(); diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index c95f4df53..79d049bee 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -13,7 +13,6 @@ pub use async_runtime::MpscUnbounded; pub use async_runtime::OneshotSender; pub use util::TypeConfigExt; -use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; use crate::vote::raft_vote::RaftVote; @@ -89,7 +88,7 @@ pub trait RaftTypeConfig: type Vote: RaftVote; /// Raft log entry, which can be built from an AppData. - type Entry: RaftEntry + FromAppData; + type Entry: RaftEntry; /// Snapshot data for exposing a snapshot for reading & writing. /// @@ -129,6 +128,8 @@ pub mod alias { pub type DOf = ::D; pub type ROf = ::R; + pub type AppDataOf = ::D; + pub type AppResponseOf = ::R; pub type NodeIdOf = ::NodeId; pub type NodeOf = ::Node; pub type TermOf = ::Term; diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 074bbb7dd..15e01fde3 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntryExt; use openraft::storage::IOFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogReader; @@ -26,7 +27,6 @@ use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; @@ -336,7 +336,7 @@ impl RaftLogStorage for Arc { Some(serialized) => { let ent: Entry = serde_json::from_str(serialized).map_err(|e| StorageError::read_logs(&e))?; - Some(*ent.get_log_id()) + Some(ent.to_log_id()) } }; @@ -392,9 +392,8 @@ impl RaftLogStorage for Arc { where I: IntoIterator> + OptionalSend { let mut log = self.log.write().await; for entry in entries { - let s = - serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(*entry.get_log_id(), &e))?; - log.insert(entry.log_id.index(), s); + let s = serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(entry.to_log_id(), &e))?; + log.insert(entry.log_id.index, s); } callback.io_completed(Ok(())); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 35e046a23..d61d71207 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -52,7 +52,6 @@ use openraft::LogIdOptionExt; use openraft::OptionalSend; use openraft::RPCTypes; use openraft::Raft; -use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftMetrics; use openraft::RaftState; @@ -186,6 +185,7 @@ impl fmt::Display for Direction { use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntryExt; use openraft::network::v2::RaftNetworkV2; use openraft::vote::RaftLeaderId; use openraft::vote::RaftLeaderIdExt; @@ -1045,7 +1045,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { rpc.entries.truncate(quota as usize); *x = Some(0); if let Some(last) = rpc.entries.last() { - Some(Some(*last.get_log_id())) + Some(Some(last.to_log_id())) } else { Some(rpc.prev_log_id) }