From af7051dd1644c7c16b8a4003b15aa0fd32772245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 28 Jan 2025 11:44:58 +0800 Subject: [PATCH] Refactor: Introduce `RefLogId` as a reference to a log ID Existing `LogIdOf` provides a minimal storage implementation for a log ID with essential properties. In contrast, `RefLogId` offers the same information as `LogIdOf` while adding additional system-defined properties. For example, in the future, `LogIdOf` defined by the application will not need to implement `Ord`. However, `RefLogId`, used internally, will provide a system-defined `Ord` implementation. This change updates internal components to return `RefLogId` or accept it as an argument where possible, enabling more flexibility and consistency in handling log IDs. Change: refine the `RaftEntry` trait - The `RaftEntry` trait now requires `AsRef>` and `AsMut>`, providing a more standard API for accessing the log ID of a log entry. As a result, the `RaftEntry: RaftLogId` requirement is no longer needed. - A new method, `new_normal()`, has been added to the `RaftEntry` trait to replace the `FromAppData` trait. - Additional utility methods for working with entries are now provided in the `RaftEntryExt` trait. - Part of #1278 Upgrade tips: 1. **For applications using a custom `RaftEntry` implementation** (e.g., declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`): - Update the `RaftEntry` implementation for your custom entry type (`MyEntry`) by adding the `new_normal()` method. - Implement `AsRef` and `AsMut` for your custom entry type. 2. **For applications using the default `Entry` provided by OpenRaft**: - No changes are required. empty: try to remove log id from RaftTypeConfig --- cluster_benchmark/tests/benchmark/store.rs | 1 - examples/raft-kv-memstore-grpc/build.rs | 17 ++ .../proto/api_service.proto | 6 +- .../proto/internal_service.proto | 77 ++++++++- .../proto/management_service.proto | 6 +- .../src/grpc/internal_service.rs | 23 +-- examples/raft-kv-memstore-grpc/src/lib.rs | 68 ++++++++ .../raft-kv-memstore-grpc/src/network/mod.rs | 13 +- .../src/pb_impl/impl_entry.rs | 50 ++++++ .../src/pb_impl/impl_membership.rs | 35 +++++ .../raft-kv-memstore-grpc/src/pb_impl/mod.rs | 2 + .../raft-kv-memstore-grpc/src/store/mod.rs | 32 ++-- examples/utils/declare_types.rs | 2 +- openraft/src/core/raft_core.rs | 26 +--- openraft/src/engine/engine_impl.rs | 3 +- .../engine/handler/following_handler/mod.rs | 10 +- .../src/engine/handler/leader_handler/mod.rs | 3 +- .../src/engine/handler/log_handler/mod.rs | 5 +- .../engine/handler/snapshot_handler/mod.rs | 2 +- .../src/engine/handler/vote_handler/mod.rs | 3 +- openraft/src/engine/log_id_list.rs | 114 ++++++++------ openraft/src/engine/testing.rs | 2 +- openraft/src/engine/tests/log_id_list_test.rs | 14 +- openraft/src/entry/mod.rs | 45 +----- openraft/src/entry/payload.rs | 4 - openraft/src/entry/raft_entry_ext.rs | 20 +++ openraft/src/entry/traits.rs | 65 +++++--- openraft/src/impls/mod.rs | 2 + openraft/src/lib.rs | 1 - openraft/src/log_id/log_id_option_ext.rs | 27 +--- openraft/src/log_id/mod.rs | 20 ++- openraft/src/log_id/option_raft_log_id_ext.rs | 51 ++++++ openraft/src/log_id/option_ref_log_id_ext.rs | 20 +++ openraft/src/log_id/raft_log_id.rs | 43 ++++-- openraft/src/log_id/raft_log_id_ext.rs | 32 ++++ openraft/src/log_id/ref_log_id.rs | 60 +++++++ openraft/src/log_id_range.rs | 6 +- .../src/membership/effective_membership.rs | 5 +- openraft/src/progress/entry/tests.rs | 5 + openraft/src/progress/inflight/mod.rs | 3 +- openraft/src/proposer/leader.rs | 36 +++-- openraft/src/raft_state/log_state_reader.rs | 20 ++- openraft/src/raft_state/mod.rs | 30 ++-- .../tests/forward_to_leader_test.rs | 15 +- .../raft_state/tests/log_state_reader_test.rs | 25 ++- openraft/src/replication/mod.rs | 6 +- openraft/src/summary.rs | 3 +- openraft/src/testing/common.rs | 18 +-- openraft/src/testing/log/suite.rs | 146 +++++++++--------- openraft/src/type_config.rs | 3 +- stores/memstore/src/lib.rs | 2 +- 51 files changed, 840 insertions(+), 387 deletions(-) create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs create mode 100644 openraft/src/entry/raft_entry_ext.rs create mode 100644 openraft/src/log_id/option_raft_log_id_ext.rs create mode 100644 openraft/src/log_id/option_ref_log_id_ext.rs create mode 100644 openraft/src/log_id/raft_log_id_ext.rs create mode 100644 openraft/src/log_id/ref_log_id.rs diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index b16a2eb97..d5962de69 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -21,7 +21,6 @@ use openraft::storage::Snapshot; use openraft::Entry; use openraft::EntryPayload; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs index 36a5cdbc1..6db896526 100644 --- a/examples/raft-kv-memstore-grpc/build.rs +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -11,6 +11,7 @@ fn main() -> Result<(), Box> { // TODO: remove serde tonic_build::configure() + .btree_map(["."]) .type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") .type_attribute( "openraftpb.SetRequest", @@ -25,6 +26,22 @@ fn main() -> Result<(), Box> { "#[derive(Eq, serde::Serialize, serde::Deserialize)]", ) .type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") + .type_attribute( + "google.protobuf.Empty", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.NodeIdSet", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.Membership", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.Entry", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) .compile_protos_with_config(config, &proto_files, &["proto"])?; Ok(()) } diff --git a/examples/raft-kv-memstore-grpc/proto/api_service.proto b/examples/raft-kv-memstore-grpc/proto/api_service.proto index e549473fe..88f97f3ff 100644 --- a/examples/raft-kv-memstore-grpc/proto/api_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/api_service.proto @@ -1,4 +1,5 @@ syntax = "proto3"; +import "internal_service.proto"; package openraftpb; // ApiService provides the key-value store API operations @@ -20,8 +21,3 @@ message Response { optional string value = 1; // Retrieved value } -// SetRequest represents a key-value pair to be stored -message SetRequest { - string key = 1; // Key to store - string value = 2; // Value to associate with the key -} diff --git a/examples/raft-kv-memstore-grpc/proto/internal_service.proto b/examples/raft-kv-memstore-grpc/proto/internal_service.proto index 9414a7c16..e605a589f 100644 --- a/examples/raft-kv-memstore-grpc/proto/internal_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/internal_service.proto @@ -1,6 +1,19 @@ syntax = "proto3"; +import "google/protobuf/empty.proto"; package openraftpb; +// SetRequest represents a key-value pair to be stored +message SetRequest { + string key = 1; // Key to store + string value = 2; // Value to associate with the key +} + +// Node represents a single node in the Raft cluster +message Node { + string rpc_addr = 1; // RPC address for node communication + uint64 node_id = 2; // Unique identifier for the node +} + // LeaderId represents the leader identifier in Raft message LeaderId { uint64 term = 1; @@ -13,6 +26,33 @@ message Vote { bool committed = 2; } +message Entry { + uint64 term = 1; + uint64 index = 2; + + // Optional Application data + SetRequest app_data = 12; + + // Optional Membership config + Membership membership = 13; +} + + +// NodeIds is a set of NodeIds +message NodeIdSet { + map node_ids = 1; +} + +// Membership config +message Membership { + // Joint(includes more than one NodeIdSet) or uniform(one NodeIdSet) config. + repeated NodeIdSet configs = 1; + + // All of the nodes in the cluster, including voters and learners. + // A node id that is included in `configs` is a voter, otherwise it is a learner. + map nodes = 2; +} + // LogId represents the log identifier in Raft message LogId { uint64 term = 1; @@ -32,23 +72,48 @@ message VoteResponse { LogId last_log_id = 3; } +message AppendEntriesRequest { + // The leader's vote, used to identify the leader, and must be committed + Vote vote = 1; + + // The previous log id the leader has sent to the follower + LogId prev_log_id = 2; + + // The entries to be appended to the follower's log + repeated Entry entries = 3; + + // The leader's last committed log id + LogId leader_commit = 4; +} + +message AppendEntriesResponse { + // If not None, the follower rejected the AppendEntries request due to having a higher vote. + // All other fields are valid only when this field is None + Vote rejected_by = 1; + + // The follower accepts this AppendEntries request's vote, but the prev_log_id conflicts with + // the follower's log. The leader should retry with a smaller prev_log_id that matches the + // follower's log. All subsequent fields are valid only when this field is false + bool conflict = 2; + + // The last log id the follower accepted from this request. + // If None, all input entries were accepted and persisted. + // Otherwise, only entries up to and including this id were accepted + LogId last_log_id = 3; +} + // InternalService handles internal Raft cluster communication service InternalService { // Vote handles vote requests between Raft nodes during leader election rpc Vote(VoteRequest) returns (VoteResponse) {} // AppendEntries handles call related to append entries RPC - rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {} + rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {} // Snapshot handles install snapshot RPC rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {} } -// RaftRequestBytes encapsulates binary Raft request data -message RaftRequestBytes { - bytes value = 1; // Serialized Raft request data -} - // RaftReplyBytes encapsulates binary Raft response data message RaftReplyBytes { bytes value = 1; // Serialized Raft response data diff --git a/examples/raft-kv-memstore-grpc/proto/management_service.proto b/examples/raft-kv-memstore-grpc/proto/management_service.proto index a612cdcd9..891108d91 100644 --- a/examples/raft-kv-memstore-grpc/proto/management_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/management_service.proto @@ -1,4 +1,5 @@ syntax = "proto3"; +import "internal_service.proto"; package openraftpb; // ManagementService handles Raft cluster management operations @@ -21,11 +22,6 @@ message InitRequest { repeated Node nodes = 1; // List of initial cluster nodes } -// Node represents a single node in the Raft cluster -message Node { - string rpc_addr = 1; // RPC address for node communication - uint64 node_id = 2; // Unique identifier for the node -} // AddLearnerRequest specifies parameters for adding a learner node message AddLearnerRequest { diff --git a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs index 84c2af345..d18d74d5e 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs @@ -8,9 +8,9 @@ use tonic::Status; use tonic::Streaming; use tracing::debug; +use crate::protobuf as pb; use crate::protobuf::internal_service_server::InternalService; use crate::protobuf::RaftReplyBytes; -use crate::protobuf::RaftRequestBytes; use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest; use crate::protobuf::VoteResponse; @@ -75,15 +75,10 @@ impl InternalService for InternalServiceImpl { /// Nodes vote for candidates based on log completeness and term numbers. async fn vote(&self, request: Request) -> Result, Status> { debug!("Processing vote request"); - let req = request.into_inner(); - // Deserialize the vote request - let vote_req = req.into(); - - // Process the vote request let vote_resp = self .raft_node - .vote(vote_req) + .vote(request.into_inner().into()) .await .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?; @@ -103,22 +98,20 @@ impl InternalService for InternalServiceImpl { /// # Protocol Details /// This implements the AppendEntries RPC from the Raft protocol. /// Used for both log replication and as heartbeat mechanism. - async fn append_entries(&self, request: Request) -> Result, Status> { + async fn append_entries( + &self, + request: Request, + ) -> Result, Status> { debug!("Processing append entries request"); - let req = request.into_inner(); - - // Deserialize the append request - let append_req = Self::deserialize_request(&req.value)?; - // Process the append request let append_resp = self .raft_node - .append_entries(append_req) + .append_entries(request.into_inner().into()) .await .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?; debug!("Append entries request processed successfully"); - Self::create_response(append_resp) + Ok(Response::new(append_resp.into())) } /// Handles snapshot installation requests for state transfer using streaming. diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index 3e0a5e58d..d4013db16 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -21,6 +21,7 @@ openraft::declare_raft_types!( R = pb::Response, LeaderId = pb::LeaderId, Vote = pb::Vote, + Entry = pb::Entry, Node = pb::Node, SnapshotData = StateMachineData, ); @@ -57,6 +58,73 @@ impl From for VoteResponse { } } +impl From for AppendEntriesRequest { + fn from(proto_req: pb::AppendEntriesRequest) -> Self { + AppendEntriesRequest { + vote: proto_req.vote.unwrap(), + prev_log_id: proto_req.prev_log_id.map(|log_id| log_id.into()), + entries: proto_req.entries, + leader_commit: proto_req.leader_commit.map(|log_id| log_id.into()), + } + } +} + +impl From for pb::AppendEntriesRequest { + fn from(value: AppendEntriesRequest) -> Self { + pb::AppendEntriesRequest { + vote: Some(value.vote), + prev_log_id: value.prev_log_id.map(|log_id| log_id.into()), + entries: value.entries, + leader_commit: value.leader_commit.map(|log_id| log_id.into()), + } + } +} + +impl From for AppendEntriesResponse { + fn from(r: pb::AppendEntriesResponse) -> Self { + if let Some(higher) = r.rejected_by { + return AppendEntriesResponse::HigherVote(higher); + } + + if r.conflict { + return AppendEntriesResponse::Conflict; + } + + if let Some(log_id) = r.last_log_id { + AppendEntriesResponse::PartialSuccess(Some(log_id.into())) + } else { + AppendEntriesResponse::Success + } + } +} + +impl From for pb::AppendEntriesResponse { + fn from(r: AppendEntriesResponse) -> Self { + match r { + AppendEntriesResponse::Success => pb::AppendEntriesResponse { + rejected_by: None, + conflict: false, + last_log_id: None, + }, + AppendEntriesResponse::PartialSuccess(p) => pb::AppendEntriesResponse { + rejected_by: None, + conflict: false, + last_log_id: p.map(|log_id| log_id.into()), + }, + AppendEntriesResponse::Conflict => pb::AppendEntriesResponse { + rejected_by: None, + conflict: true, + last_log_id: None, + }, + AppendEntriesResponse::HigherVote(v) => pb::AppendEntriesResponse { + rejected_by: Some(v), + conflict: false, + last_log_id: None, + }, + } + } +} + impl From for pb::LogId { fn from(log_id: LogId) -> Self { pb::LogId { diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs index b8798951c..88a1d4f84 100644 --- a/examples/raft-kv-memstore-grpc/src/network/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -9,7 +9,6 @@ use tonic::transport::Channel; use crate::protobuf as pb; use crate::protobuf::internal_service_client::InternalServiceClient; -use crate::protobuf::RaftRequestBytes; use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest as PbVoteRequest; use crate::protobuf::VoteResponse as PbVoteResponse; @@ -65,12 +64,12 @@ impl RaftNetworkV2 for NetworkConnection { }; let mut client = InternalServiceClient::new(channel); - let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - let request = RaftRequestBytes { value }; - let response = client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - let message = response.into_inner(); - let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - Ok(result) + let response = client + .append_entries(pb::AppendEntriesRequest::from(req)) + .await + .map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let response = response.into_inner(); + Ok(AppendEntriesResponse::from(response)) } async fn full_snapshot( diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs new file mode 100644 index 000000000..303f43cc7 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs @@ -0,0 +1,50 @@ +use std::fmt; + +use openraft::alias::LogIdOf; +use openraft::entry::RaftEntry; +use openraft::entry::RaftPayload; +use openraft::EntryPayload; +use openraft::Membership; + +use crate::protobuf as pb; +use crate::TypeConfig; + +impl fmt::Display for pb::Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Entry{{term={},index={}}}", self.term, self.index) + } +} + +impl RaftPayload for pb::Entry { + fn get_membership(&self) -> Option> { + self.membership.clone().map(Into::into) + } +} + +impl RaftEntry for pb::Entry { + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self { + let mut app_data = None; + let mut membership = None; + match payload { + EntryPayload::Blank => {} + EntryPayload::Normal(data) => app_data = Some(data), + EntryPayload::Membership(m) => membership = Some(m.into()), + } + + Self { + term: log_id.leader_id, + index: log_id.index, + app_data, + membership, + } + } + + fn log_id_parts(&self) -> (&u64, u64) { + (&self.term, self.index) + } + + fn set_log_id(&mut self, new: LogIdOf) { + self.term = new.leader_id; + self.index = new.index; + } +} diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs new file mode 100644 index 000000000..a4567cbb8 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs @@ -0,0 +1,35 @@ +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::Membership; + +use crate::pb; +use crate::TypeConfig; + +impl From for Membership { + fn from(value: pb::Membership) -> Self { + let mut configs = vec![]; + for c in value.configs { + let config: BTreeSet = c.node_ids.keys().copied().collect(); + configs.push(config); + } + let nodes = value.nodes; + // TODO: do not unwrap() + Membership::new(configs, nodes).unwrap() + } +} + +impl From> for pb::Membership { + fn from(value: Membership) -> Self { + let mut configs = vec![]; + for c in value.get_joint_config() { + let mut node_ids = BTreeMap::new(); + for nid in c.iter() { + node_ids.insert(*nid, ()); + } + configs.push(pb::NodeIdSet { node_ids }); + } + let nodes = value.nodes().map(|(nid, n)| (*nid, n.clone())).collect(); + pb::Membership { configs, nodes } + } +} diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs index 215d5a347..c9fba7a4d 100644 --- a/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs @@ -1,4 +1,6 @@ //! Implements traits for protobuf types +mod impl_entry; mod impl_leader_id; +mod impl_membership; mod impl_vote; diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs index 5e00e6153..0597c2112 100644 --- a/examples/raft-kv-memstore-grpc/src/store/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use std::sync::Mutex; use bincode; +use openraft::entry::RaftEntry; use openraft::storage::RaftStateMachine; -use openraft::EntryPayload; use openraft::RaftSnapshotBuilder; use serde::Deserialize; use serde::Serialize; @@ -129,21 +129,23 @@ impl RaftStateMachine for Arc { let mut sm = self.state_machine.lock().unwrap(); for entry in entries { - tracing::debug!(%entry.log_id, "replicate to sm"); - - sm.last_applied = Some(entry.log_id); - - match entry.payload { - EntryPayload::Blank => res.push(Response { value: None }), - EntryPayload::Normal(req) => { - sm.data.insert(req.key, req.value.clone()); - res.push(Response { value: Some(req.value) }); - } - EntryPayload::Membership(ref mem) => { - sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); - res.push(Response { value: None }) - } + let log_id = entry.log_id(); + + tracing::debug!("replicate to sm: {}", log_id); + + sm.last_applied = Some(log_id); + + let value = if let Some(req) = entry.app_data { + sm.data.insert(req.key, req.value.clone()); + Some(req.value) + } else if let Some(mem) = entry.membership { + sm.last_membership = StoredMembership::new(Some(log_id), mem.into()); + None + } else { + None }; + + res.push(Response { value }); } Ok(res) } diff --git a/examples/utils/declare_types.rs b/examples/utils/declare_types.rs index 74d8b800b..c09670e8e 100644 --- a/examples/utils/declare_types.rs +++ b/examples/utils/declare_types.rs @@ -6,7 +6,7 @@ pub type Raft = openraft::Raft; pub type Vote = ::Vote; pub type LeaderId = ::LeaderId; pub type LogId = openraft::LogId; -pub type Entry = openraft::Entry; +pub type Entry = ::Entry; pub type EntryPayload = openraft::EntryPayload; pub type StoredMembership = openraft::StoredMembership; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c0528f0b0..3202d897e 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1,4 +1,3 @@ -use std::borrow::Borrow; use std::collections::BTreeMap; use std::fmt; use std::fmt::Debug; @@ -43,7 +42,6 @@ use crate::engine::Condition; use crate::engine::Engine; use crate::engine::ReplicationProgress; use crate::engine::Respond; -use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::error::AllowNextRevertError; use crate::error::ClientWriteError; @@ -54,8 +52,7 @@ use crate::error::InitializeError; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; -use crate::log_id::LogIdOptionExt; -use crate::log_id::RaftLogId; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; use crate::metrics::HeartbeatMetrics; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; @@ -542,17 +539,8 @@ where pub fn flush_metrics(&mut self) { let (replication, heartbeat) = if let Some(leader) = self.engine.leader.as_ref() { let replication_prog = &leader.progress; - let replication = Some( - replication_prog - .iter() - .map(|(id, p)| { - ( - id.clone(), - as Borrow>>>::borrow(p).clone(), - ) - }) - .collect(), - ); + let replication = + Some(replication_prog.iter().map(|(id, p)| (id.clone(), p.matching().cloned())).collect()); let clock_prog = &leader.clock_progress; let heartbeat = @@ -842,7 +830,7 @@ where session_id, self.config.clone(), self.engine.state.committed().cloned(), - progress_entry.matching().cloned(), + progress_entry.matching.clone(), network, snapshot_network, self.log_store.get_log_reader().await, @@ -1246,7 +1234,7 @@ where self.handle_check_is_leader_request(tx).await; } RaftMsg::ClientWriteRequest { app_data, tx } => { - self.write_entry(C::Entry::from_app_data(app_data), Some(tx)); + self.write_entry(C::Entry::new_normal(LogIdOf::::default(), app_data), Some(tx)); } RaftMsg::Initialize { members, tx } => { tracing::info!( @@ -1746,10 +1734,10 @@ where committed_vote: vote, entries, } => { - let last_log_id = entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - let io_id = IOId::new_log_io(vote, Some(last_log_id.clone())); + let io_id = IOId::new_log_io(vote, Some(last_log_id)); let notify = Notification::LocalIO { io_id: io_id.clone() }; let callback = IOFlushed::new(notify, self.tx_notification.downgrade()); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 8f7a79db1..465bb188d 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -56,7 +56,6 @@ use crate::vote::RaftTerm; use crate::vote::RaftVote; use crate::LogIdOptionExt; use crate::Membership; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Raft protocol algorithm. @@ -191,7 +190,7 @@ where C: RaftTypeConfig self.check_initialize()?; // The very first log id - entry.set_log_id(&LogIdOf::::default()); + entry.set_log_id(LogIdOf::::default()); let m = entry.get_membership().expect("the only log entry for initializing has to be membership log"); self.check_members_contain_me(&m)?; diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 1b9d919c8..78ef5ad63 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -11,17 +11,17 @@ use crate::engine::Command; use crate::engine::Condition; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::raft_entry_ext::RaftEntryExt; use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::error::RejectAppendEntries; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::storage::Snapshot; use crate::type_config::alias::LogIdOf; use crate::vote::committed::CommittedVote; use crate::EffectiveMembership; -use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; use crate::StoredMembership; @@ -74,6 +74,8 @@ where C: RaftTypeConfig } let last_log_id = entries.last().map(|ent| ent.log_id()); + // let last_log_id = std::cmp::max_by_key(prev_log_id, last_log_id, |x: &Option>| + // x.ord_by()); let last_log_id = std::cmp::max(prev_log_id, last_log_id); let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone())); @@ -145,9 +147,9 @@ where C: RaftTypeConfig pub(crate) fn do_append_entries(&mut self, entries: Vec) { debug_assert!(!entries.is_empty()); debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),); - debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last()); + debug_assert!(Some(entries[0].ref_log_id()) > self.state.log_ids.last().to_ref()); - self.state.extend_log_ids(&entries); + self.state.extend_log_ids(entries.iter().map(|x| x.ref_log_id())); self.append_membership(entries.iter()); self.output.push_command(Command::AppendInputEntries { diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 9d2dbc38e..e069bde65 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -2,6 +2,7 @@ use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::raft_entry_ext::RaftEntryExt; use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::proposer::Leader; @@ -58,7 +59,7 @@ where C: RaftTypeConfig self.leader.assign_log_ids(&mut entries); - self.state.extend_log_ids_from_same_leader(&entries); + self.state.extend_log_ids_from_same_leader(entries.iter().map(|x| x.ref_log_id())); let mut membership_entry = None; for entry in entries.iter() { diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index 79db5c05b..69fbb1eb6 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -2,6 +2,7 @@ use crate::display_ext::DisplayOptionExt; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::log_id::option_ref_log_id_ext::OptionRefLogIdExt; use crate::raft_state::LogStateReader; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; @@ -100,7 +101,7 @@ where C: RaftTypeConfig return None; } - let log_id = self.state.log_ids.get(purge_end - 1); + let log_id = self.state.log_ids.ref_at(purge_end - 1); debug_assert!( log_id.is_some(), "log id not found at {}, engine.state:{:?}", @@ -108,6 +109,6 @@ where C: RaftTypeConfig st ); - log_id + log_id.to_log_id() } } diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 9aecfbf59..d03452276 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -53,7 +53,7 @@ where C: RaftTypeConfig pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta) -> bool { tracing::info!("update_snapshot: {:?}", meta); - if meta.last_log_id <= self.state.snapshot_last_log_id().cloned() { + if meta.last_log_id.as_ref() <= self.state.snapshot_last_log_id() { tracing::info!( "No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})", self.state.snapshot_last_log_id().display(), diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 8fcc09fea..376296df6 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use std::time::Duration; use crate::core::raft_msg::ResultSender; +use crate::display_ext::DisplayOptionExt; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::handler::server_state_handler::ServerStateHandler; @@ -164,7 +165,7 @@ where C: RaftTypeConfig "become leader: node-{}, my vote: {}, last-log-id: {}", self.config.id, self.state.vote_ref(), - self.state.last_log_id().cloned().unwrap_or_default() + self.state.last_log_id().display() ); if let Some(l) = self.leader.as_mut() { diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 7f525eb90..8aaabc211 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -1,10 +1,13 @@ use std::ops::RangeInclusive; use crate::engine::leader_log_ids::LeaderLogIds; -use crate::log_id::RaftLogId; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; use crate::storage::RaftLogReaderExt; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; -use crate::LogIdOptionExt; use crate::RaftLogReader; use crate::RaftTypeConfig; use crate::StorageError; @@ -80,7 +83,7 @@ where C: RaftTypeConfig // Two adjacent logs with different leader_id, no need to binary search if first.index() + 1 == last.index() { - if res.last().map(|x| x.committed_leader_id()) < Some(first.committed_leader_id()) { + if res.last().committed_leader_id() < Some(first.committed_leader_id()) { res.push(first); } res.push(last); @@ -91,7 +94,7 @@ where C: RaftTypeConfig if first.committed_leader_id() == mid.committed_leader_id() { // Case AAC - if res.last().map(|x| x.committed_leader_id()) < Some(first.committed_leader_id()) { + if res.last().committed_leader_id() < Some(first.committed_leader_id()) { res.push(first); } stack.push((mid, last)); @@ -130,43 +133,46 @@ where C: RaftTypeConfig /// Extends a list of `log_id` that are proposed by a same leader. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - if let Some(first) = new_ids.first() { - let first_id = first.get_log_id(); - self.append(first_id.clone()); + pub(crate) fn extend_from_same_leader(&mut self, new_ids: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: DoubleEndedIterator, + { + let mut it = new_ids.into_iter(); + if let Some(first) = it.next() { + self.append(first.to_log_id()); - if let Some(last) = new_ids.last() { - let last_id = last.get_log_id(); - assert_eq!(last_id.committed_leader_id(), first_id.committed_leader_id()); + if let Some(last) = it.next_back() { + debug_assert_eq!(last.committed_leader_id(), first.committed_leader_id()); - if last_id != first_id { - self.append(last_id.clone()); + if last != first { + self.append(last.to_log_id()); } } } } /// Extends a list of `log_id`. - // leader_id: Copy is feature gated - #[allow(clippy::clone_on_copy)] - pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - let mut prev = self.last().map(|x| x.committed_leader_id().clone()); - - for x in new_ids.iter() { - let log_id = x.get_log_id(); - - if prev.as_ref() != Some(log_id.committed_leader_id()) { - self.append(log_id.clone()); + pub(crate) fn extend(&mut self, new_ids: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { + let it = new_ids.into_iter(); + let len = it.len(); - prev = Some(log_id.committed_leader_id().clone()); + for (i, log_id) in it.enumerate() { + if self.last_leader_id() != Some(log_id.committed_leader_id()) { + self.append(log_id.to_log_id()); } - } - if let Some(last) = new_ids.last() { - let log_id = last.get_log_id(); - - if self.last() != Some(log_id) { - self.append(log_id.clone()); + #[allow(clippy::collapsible_if)] + if i == len - 1 { + if self.ref_last() != Some(log_id.to_ref()) { + self.append(log_id.to_log_id()); + } } } } @@ -270,35 +276,45 @@ where C: RaftTypeConfig } Err(i) => { self.key_log_ids = self.key_log_ids.split_off(i - 1); - self.key_log_ids[0].index = upto.index(); + self.key_log_ids[0] = + LogIdOf::::new(self.key_log_ids[0].committed_leader_id().clone(), upto.index()); } } } - /// Get the log id at the specified index. + // This method is only used in tests + #[allow(dead_code)] + pub(crate) fn get(&self, index: u64) -> Option> { + self.ref_at(index).map(|x| x.into_owned()) + } + + /// Get the log id at the specified index in a [`RefLogId`]. /// /// It will return `last_purged_log_id` if index is at the last purged index. - // leader_id: Copy is feature gated #[allow(clippy::clone_on_copy)] - pub(crate) fn get(&self, index: u64) -> Option> { + pub(crate) fn ref_at(&self, index: u64) -> Option> { let res = self.key_log_ids.binary_search_by(|log_id| log_id.index().cmp(&index)); - match res { - Ok(i) => Some(LogIdOf::::new( - self.key_log_ids[i].committed_leader_id().clone(), - index, - )), + // Index of the leadership change point that covers the target index. + // It points to either: + // - The exact matching log entry if found, or + // - The most recent change point before the target index + let change_point = match res { + Ok(i) => i, Err(i) => { + // i - 1 is the last one that is smaller than the input. if i == 0 || i == self.key_log_ids.len() { - None + return None; } else { - Some(LogIdOf::::new( - self.key_log_ids[i - 1].committed_leader_id().clone(), - index, - )) + i - 1 } } - } + }; + + Some(RefLogId::new( + self.key_log_ids[change_point].committed_leader_id(), + index, + )) } pub(crate) fn first(&self) -> Option<&LogIdOf> { @@ -309,6 +325,14 @@ where C: RaftTypeConfig self.key_log_ids.last() } + pub(crate) fn ref_last(&self) -> Option> { + self.last().map(|x| x.to_ref()) + } + + pub(crate) fn last_leader_id(&self) -> Option<&CommittedLeaderIdOf> { + self.last().map(|x| x.committed_leader_id()) + } + // This method will only be used under feature tokio-rt #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] pub(crate) fn key_log_ids(&self) -> &[LogIdOf] { diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index 79e89bb88..0277087b0 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -42,7 +42,7 @@ where N: Node + Ord type Term = u64; type LeaderId = crate::impls::leader_id_adv::LeaderId; type Vote = crate::impls::Vote; - type Entry = crate::Entry; + type Entry = crate::impls::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/engine/tests/log_id_list_test.rs b/openraft/src/engine/tests/log_id_list_test.rs index 13244cac3..ab1aa084c 100644 --- a/openraft/src/engine/tests/log_id_list_test.rs +++ b/openraft/src/engine/tests/log_id_list_test.rs @@ -9,13 +9,13 @@ fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { // Extend one log id to an empty LogIdList: Just store it directly - ids.extend_from_same_leader(&[log_id(1, 1, 2)]); + ids.extend_from_same_leader([log_id(1, 1, 2)]); assert_eq!(vec![log_id(1, 1, 2)], ids.key_log_ids()); // Extend two log ids that are adjacent to the last stored one. // It should append only one log id as the new ending log id. - ids.extend_from_same_leader(&[ + ids.extend_from_same_leader([ log_id(1, 1, 3), // log_id(1, 1, 4), ]); @@ -31,7 +31,7 @@ fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { // Extend 3 log id with new leader id. // It should just store every log id for each leader, plus one last-log-id. - ids.extend_from_same_leader(&[ + ids.extend_from_same_leader([ log_id(2, 1, 5), // log_id(2, 1, 6), log_id(2, 1, 7), @@ -55,13 +55,13 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend one log id to an empty LogIdList: Just store it directly - ids.extend(&[log_id(1, 1, 2)]); + ids.extend([log_id(1, 1, 2)]); assert_eq!(vec![log_id(1, 1, 2)], ids.key_log_ids()); // Extend two log ids that are adjacent to the last stored one. // It should append only one log id as the new ending log id. - ids.extend(&[ + ids.extend([ log_id(1, 1, 3), // log_id(1, 1, 4), ]); @@ -77,7 +77,7 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend 3 log id with different leader id. // Last two has the same leader id. - ids.extend(&[ + ids.extend([ log_id(1, 1, 5), // log_id(2, 1, 6), log_id(2, 1, 7), @@ -95,7 +95,7 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend 3 log id with different leader id. // Last two have different leader id. - ids.extend(&[ + ids.extend([ log_id(2, 1, 8), // log_id(2, 1, 9), log_id(3, 1, 10), diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index 25284bddd..96310b0f6 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -3,18 +3,18 @@ use std::fmt; use std::fmt::Debug; -use crate::log_id::RaftLogId; use crate::Membership; use crate::RaftTypeConfig; pub mod payload; +pub(crate) mod raft_entry_ext; mod traits; pub use payload::EntryPayload; -pub use traits::FromAppData; pub use traits::RaftEntry; pub use traits::RaftPayload; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; /// A Raft log entry. @@ -89,52 +89,23 @@ where C: RaftTypeConfig impl RaftPayload for Entry where C: RaftTypeConfig { - fn is_blank(&self) -> bool { - self.payload.is_blank() - } - fn get_membership(&self) -> Option> { self.payload.get_membership() } } -impl RaftLogId for Entry -where C: RaftTypeConfig -{ - fn get_log_id(&self) -> &LogIdOf { - &self.log_id - } - - fn set_log_id(&mut self, log_id: &LogIdOf) { - self.log_id = log_id.clone(); - } -} - impl RaftEntry for Entry where C: RaftTypeConfig { - fn new_blank(log_id: LogIdOf) -> Self { - Self { - log_id, - payload: EntryPayload::Blank, - } + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self { + Self { log_id, payload } } - fn new_membership(log_id: LogIdOf, m: Membership) -> Self { - Self { - log_id, - payload: EntryPayload::Membership(m), - } + fn log_id_parts(&self) -> (&CommittedLeaderIdOf, u64) { + (&self.log_id.leader_id, self.log_id.index) } -} -impl FromAppData for Entry -where C: RaftTypeConfig -{ - fn from_app_data(d: C::D) -> Self { - Entry { - log_id: LogIdOf::::default(), - payload: EntryPayload::Normal(d), - } + fn set_log_id(&mut self, new: LogIdOf) { + self.log_id = new; } } diff --git a/openraft/src/entry/payload.rs b/openraft/src/entry/payload.rs index 133a5cb21..76a6afefc 100644 --- a/openraft/src/entry/payload.rs +++ b/openraft/src/entry/payload.rs @@ -63,10 +63,6 @@ where C: RaftTypeConfig } impl RaftPayload for EntryPayload { - fn is_blank(&self) -> bool { - matches!(self, EntryPayload::Blank) - } - fn get_membership(&self) -> Option> { if let EntryPayload::Membership(m) = self { Some(m.clone()) diff --git a/openraft/src/entry/raft_entry_ext.rs b/openraft/src/entry/raft_entry_ext.rs new file mode 100644 index 000000000..5d14acd83 --- /dev/null +++ b/openraft/src/entry/raft_entry_ext.rs @@ -0,0 +1,20 @@ +use crate::entry::RaftEntry; +use crate::log_id::ref_log_id::RefLogId; +use crate::RaftTypeConfig; + +pub(crate) trait RaftEntryExt: RaftEntry +where C: RaftTypeConfig +{ + /// Returns a lightweight [`RefLogId`] that contains the log id information. + fn ref_log_id(&self) -> RefLogId<'_, C> { + let (leader_id, index) = self.log_id_parts(); + RefLogId::new(leader_id, index) + } +} + +impl RaftEntryExt for T +where + C: RaftTypeConfig, + T: RaftEntry, +{ +} diff --git a/openraft/src/entry/traits.rs b/openraft/src/entry/traits.rs index ea8418c6f..3077d127d 100644 --- a/openraft/src/entry/traits.rs +++ b/openraft/src/entry/traits.rs @@ -5,8 +5,9 @@ use openraft_macros::since; use crate::base::finalized::Final; use crate::base::OptionalFeatures; -use crate::log_id::RaftLogId; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; +use crate::EntryPayload; use crate::Membership; use crate::RaftTypeConfig; @@ -14,10 +15,7 @@ use crate::RaftTypeConfig; pub trait RaftPayload where C: RaftTypeConfig { - /// Return `true` if the entry payload is blank. - fn is_blank(&self) -> bool; - - /// Return `Some(Membership)` if the entry payload is a membership payload. + /// Return `Some(Membership)` if the entry payload contains a membership payload. fn get_membership(&self) -> Option>; } @@ -26,37 +24,64 @@ pub trait RaftEntry where C: RaftTypeConfig, Self: OptionalFeatures + Debug + Display, - Self: RaftPayload + RaftLogId, + Self: RaftPayload, { - /// Create a new blank log entry. + /// Create a new log entry with log id and payload of application data or membership config. + #[since(version = "0.10.0")] + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self; + + /// Returns references to the components of this entry's log ID: the committed leader ID and + /// index. /// - /// The returned instance must return `true` for `Self::is_blank()`. - fn new_blank(log_id: LogIdOf) -> Self; + /// The returned tuple contains: + /// - A reference to the committed leader ID that proposed this log entry. + /// - The index position of this entry in the log. + /// + /// Note: Although these components constitute a `LogId`, this method returns them separately + /// rather than as a reference to `LogId`. This allows implementations to store these + /// components directly without requiring a `LogId` field in their data structure. + #[since(version = "0.10.0")] + fn log_id_parts(&self) -> (&CommittedLeaderIdOf, u64); + + /// Set the log ID of this entry. + #[since(version = "0.10.0")] + fn set_log_id(&mut self, new: LogIdOf); + + /// Create a new blank log entry. + #[since(version = "0.10.0", change = "become a default method")] + fn new_blank(log_id: LogIdOf) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Blank) + } + + /// Create a new normal log entry that contains application data. + #[since(version = "0.10.0", change = "become a default method")] + fn new_normal(log_id: LogIdOf, data: C::D) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Normal(data)) + } /// Create a new membership log entry. /// /// The returned instance must return `Some()` for `Self::get_membership()`. - fn new_membership(log_id: LogIdOf, m: Membership) -> Self; + #[since(version = "0.10.0", change = "become a default method")] + fn new_membership(log_id: LogIdOf, m: Membership) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Membership(m)) + } /// Returns the `LogId` of this entry. #[since(version = "0.10.0")] fn log_id(&self) -> LogIdOf where Self: Final { - self.get_log_id().clone() + let (leader_id, index) = self.log_id_parts(); + LogIdOf::::new(leader_id.clone(), index) } /// Returns the index of this log entry. #[since(version = "0.10.0")] fn index(&self) -> u64 where Self: Final { - self.get_log_id().index() + self.log_id_parts().1 } } - -/// Build a raft log entry from app data. -/// -/// A concrete Entry should implement this trait to let openraft create an entry when needed. -pub trait FromAppData { - /// Build a raft log entry from app data. - fn from_app_data(t: T) -> Self; -} diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index 514640898..72e01d50f 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -17,6 +17,8 @@ pub mod leader_id_std { pub use crate::vote::leader_id::leader_id_std::LeaderId; } +/// Default implementation of a raft log identity. +pub use crate::log_id::LogId; /// Default [`RaftVote`] implementation for both standard Raft mode and multi-leader-per-term mode. /// /// The difference between the two modes is the implementation of [`RaftLeaderId`]. diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index d07ebd644..459da2a6c 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -105,7 +105,6 @@ pub use crate::instant::TokioInstant; pub use crate::log_id::LogId; pub use crate::log_id::LogIdOptionExt; pub use crate::log_id::LogIndexOptionExt; -pub use crate::log_id::RaftLogId; pub use crate::membership::EffectiveMembership; pub use crate::membership::Membership; pub use crate::membership::StoredMembership; diff --git a/openraft/src/log_id/log_id_option_ext.rs b/openraft/src/log_id/log_id_option_ext.rs index efe5a3b55..678e55599 100644 --- a/openraft/src/log_id/log_id_option_ext.rs +++ b/openraft/src/log_id/log_id_option_ext.rs @@ -1,8 +1,10 @@ -use crate::type_config::alias::LogIdOf; +use crate::log_id::raft_log_id::RaftLogId; use crate::RaftTypeConfig; /// This helper trait extracts information from an `Option`. -pub trait LogIdOptionExt { +pub trait LogIdOptionExt +where C: RaftTypeConfig +{ /// Returns the log index if it is not a `None`. fn index(&self) -> Option; @@ -12,8 +14,10 @@ pub trait LogIdOptionExt { fn next_index(&self) -> u64; } -impl LogIdOptionExt for Option> -where C: RaftTypeConfig +impl LogIdOptionExt for Option +where + C: RaftTypeConfig, + T: RaftLogId, { fn index(&self) -> Option { self.as_ref().map(|x| x.index()) @@ -26,18 +30,3 @@ where C: RaftTypeConfig } } } - -impl LogIdOptionExt for Option<&LogIdOf> -where C: RaftTypeConfig -{ - fn index(&self) -> Option { - self.map(|x| x.index()) - } - - fn next_index(&self) -> u64 { - match self { - None => 0, - Some(log_id) => log_id.index() + 1, - } - } -} diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index d5b30c9dc..4b32108e1 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -3,15 +3,19 @@ mod log_id_option_ext; mod log_index_option_ext; -mod raft_log_id; +pub(crate) mod option_raft_log_id_ext; +pub(crate) mod option_ref_log_id_ext; +pub(crate) mod raft_log_id; +pub(crate) mod raft_log_id_ext; +pub(crate) mod ref_log_id; use std::fmt::Display; use std::fmt::Formatter; pub use log_id_option_ext::LogIdOptionExt; pub use log_index_option_ext::LogIndexOptionExt; -pub use raft_log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; use crate::type_config::alias::CommittedLeaderIdOf; use crate::RaftTypeConfig; @@ -42,12 +46,16 @@ where impl RaftLogId for LogId where C: RaftTypeConfig { - fn get_log_id(&self) -> &LogId { - self + fn new(leader_id: CommittedLeaderIdOf, index: u64) -> Self { + LogId { leader_id, index } } - fn set_log_id(&mut self, log_id: &LogId) { - *self = log_id.clone() + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + &self.leader_id + } + + fn index(&self) -> u64 { + self.index } } diff --git a/openraft/src/log_id/option_raft_log_id_ext.rs b/openraft/src/log_id/option_raft_log_id_ext.rs new file mode 100644 index 000000000..93711b6f7 --- /dev/null +++ b/openraft/src/log_id/option_raft_log_id_ext.rs @@ -0,0 +1,51 @@ +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::CommittedLeaderIdOf; +use crate::RaftTypeConfig; + +/// This helper trait extracts information from an `Option` where T impls [`RaftLogId`]. +pub(crate) trait OptionRaftLogIdExt +where C: RaftTypeConfig +{ + /// Returns the log index if it is not a `None`. + fn index(&self) -> Option; + + /// Returns the next log index. + /// + /// If self is `None`, it returns 0. + fn next_index(&self) -> u64; + + /// Returns the leader id that proposed this log id. + /// + /// In standard raft, committed leader id is just `term`. + fn committed_leader_id(&self) -> Option<&CommittedLeaderIdOf>; + + /// Converts `&Option` to `Option`. + fn to_ref(&self) -> Option>; +} + +impl OptionRaftLogIdExt for Option +where + C: RaftTypeConfig, + T: RaftLogId, +{ + fn index(&self) -> Option { + self.as_ref().map(|x| x.index()) + } + + fn next_index(&self) -> u64 { + match self { + None => 0, + Some(log_id) => log_id.index() + 1, + } + } + + fn committed_leader_id(&self) -> Option<&CommittedLeaderIdOf> { + self.as_ref().map(|x| x.committed_leader_id()) + } + + fn to_ref(&self) -> Option> { + self.as_ref().map(|x| x.to_ref()) + } +} diff --git a/openraft/src/log_id/option_ref_log_id_ext.rs b/openraft/src/log_id/option_ref_log_id_ext.rs new file mode 100644 index 000000000..d0d113600 --- /dev/null +++ b/openraft/src/log_id/option_ref_log_id_ext.rs @@ -0,0 +1,20 @@ +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::LogIdOf; +use crate::RaftTypeConfig; + +pub(crate) trait OptionRefLogIdExt +where C: RaftTypeConfig +{ + /// Creates a new owned [`LogId`] from the reference log ID. + /// + /// [`LogId`]: crate::log_id::LogId + fn to_log_id(&self) -> Option>; +} + +impl OptionRefLogIdExt for Option> +where C: RaftTypeConfig +{ + fn to_log_id(&self) -> Option> { + self.as_ref().map(|r| r.into_owned()) + } +} diff --git a/openraft/src/log_id/raft_log_id.rs b/openraft/src/log_id/raft_log_id.rs index 06feb8f8c..f39d7d6a4 100644 --- a/openraft/src/log_id/raft_log_id.rs +++ b/openraft/src/log_id/raft_log_id.rs @@ -1,24 +1,47 @@ +use std::fmt; + use crate::type_config::alias::CommittedLeaderIdOf; -use crate::type_config::alias::LogIdOf; use crate::RaftTypeConfig; -/// Defines API to operate an object that contains a log-id, such as a log entry or a log id. -pub trait RaftLogId -where C: RaftTypeConfig +/// Log id is the globally unique identifier of a log entry. +/// +/// Equal log id means the same log entry. +pub(crate) trait RaftLogId +where + C: RaftTypeConfig, + Self: Eq + Clone + fmt::Debug + fmt::Display, { + /// Creates a log id proposed by a committed leader `leader_id` at the given index. + // This is only used internally + #[allow(dead_code)] + fn new(leader_id: CommittedLeaderIdOf, index: u64) -> Self; + /// Returns a reference to the leader id that proposed this log id. /// /// When a `LeaderId` is committed, some of its data can be discarded. /// For example, a leader id in standard raft is `(term, node_id)`, but a log id does not have /// to store the `node_id`, because in standard raft there is at most one leader that can be /// established. - fn leader_id(&self) -> &CommittedLeaderIdOf { - self.get_log_id().committed_leader_id() + fn committed_leader_id(&self) -> &CommittedLeaderIdOf; + + /// Returns the index of the log id. + fn index(&self) -> u64; +} + +impl RaftLogId for &T +where + C: RaftTypeConfig, + T: RaftLogId, +{ + fn new(_leader_id: CommittedLeaderIdOf, _index: u64) -> Self { + unreachable!("This method should not be called on a reference.") } - /// Return a reference to the log-id it stores. - fn get_log_id(&self) -> &LogIdOf; + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + T::committed_leader_id(self) + } - /// Update the log id it contains. - fn set_log_id(&mut self, log_id: &LogIdOf); + fn index(&self) -> u64 { + T::index(self) + } } diff --git a/openraft/src/log_id/raft_log_id_ext.rs b/openraft/src/log_id/raft_log_id_ext.rs new file mode 100644 index 000000000..789318fde --- /dev/null +++ b/openraft/src/log_id/raft_log_id_ext.rs @@ -0,0 +1,32 @@ +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::LogIdOf; +use crate::RaftTypeConfig; + +pub(crate) trait RaftLogIdExt +where + C: RaftTypeConfig, + Self: RaftLogId, +{ + /// Creates a new owned [`LogId`] from this log ID implementation. + /// + /// [`LogId`]: crate::log_id::LogId + fn to_log_id(&self) -> LogIdOf { + self.to_ref().into_owned() + } + + /// Creates a reference view of this log ID implementation via a [`RefLogId`]. + fn to_ref(&self) -> RefLogId<'_, C> { + RefLogId { + leader_id: self.committed_leader_id(), + index: self.index(), + } + } +} + +impl RaftLogIdExt for T +where + C: RaftTypeConfig, + T: RaftLogId, +{ +} diff --git a/openraft/src/log_id/ref_log_id.rs b/openraft/src/log_id/ref_log_id.rs new file mode 100644 index 000000000..b5a969139 --- /dev/null +++ b/openraft/src/log_id/ref_log_id.rs @@ -0,0 +1,60 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use crate::log_id::raft_log_id::RaftLogId; +use crate::type_config::alias::CommittedLeaderIdOf; +use crate::LogId; +use crate::RaftTypeConfig; + +/// A reference to a log id, combining a reference to a committed leader ID and an index. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct RefLogId<'k, C> +where C: RaftTypeConfig +{ + pub(crate) leader_id: &'k CommittedLeaderIdOf, + pub(crate) index: u64, +} + +impl<'l, C> RefLogId<'l, C> +where C: RaftTypeConfig +{ + pub(crate) fn new(leader_id: &'l CommittedLeaderIdOf, index: u64) -> Self { + RefLogId { leader_id, index } + } + + pub(crate) fn committed_leader_id(&self) -> &'l CommittedLeaderIdOf { + self.leader_id + } + + pub(crate) fn index(&self) -> u64 { + self.index + } + + pub(crate) fn into_owned(self) -> LogId { + LogId::::new(self.leader_id.clone(), self.index) + } +} + +impl Display for RefLogId<'_, C> +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.committed_leader_id(), self.index()) + } +} + +impl RaftLogId for RefLogId<'_, C> +where C: RaftTypeConfig +{ + fn new(_leader_id: CommittedLeaderIdOf, _index: u64) -> Self { + unreachable!("RefLogId does not own the leader id, so it cannot be created from it.") + } + + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + self.leader_id + } + + fn index(&self) -> u64 { + self.index + } +} diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index acce6d89b..51240501f 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -5,7 +5,6 @@ use std::fmt::Formatter; use validit::Validate; use crate::display_ext::DisplayOptionExt; -use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -30,7 +29,7 @@ where C: RaftTypeConfig impl Copy for LogIdRange where C: RaftTypeConfig, - CommittedLeaderIdOf: Copy, + LogIdOf: Copy, { } @@ -70,11 +69,10 @@ mod tests { use crate::engine::testing::UTConfig; use crate::log_id_range::LogIdRange; - use crate::testing; use crate::type_config::alias::LogIdOf; fn log_id(index: u64) -> LogIdOf { - testing::log_id(1, 1, index) + crate::engine::testing::log_id(1, 1, index) } #[test] diff --git a/openraft/src/membership/effective_membership.rs b/openraft/src/membership/effective_membership.rs index 66e00d66a..8fb5994d0 100644 --- a/openraft/src/membership/effective_membership.rs +++ b/openraft/src/membership/effective_membership.rs @@ -4,7 +4,8 @@ use std::fmt::Debug; use std::sync::Arc; use crate::display_ext::DisplayOptionExt; -use crate::log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; use crate::quorum::Joint; use crate::quorum::QuorumSet; use crate::type_config::alias::LogIdOf; @@ -58,7 +59,7 @@ where LID: RaftLogId, { fn from(v: (&LID, Membership)) -> Self { - EffectiveMembership::new(Some(v.0.get_log_id().clone()), v.1) + EffectiveMembership::new(Some(v.0.to_log_id()), v.1) } } diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index d85107a5a..845428dc1 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -2,6 +2,7 @@ use std::borrow::Borrow; use crate::engine::testing::UTConfig; use crate::engine::EngineConfig; +use crate::log_id::ref_log_id::RefLogId; use crate::progress::entry::ProgressEntry; use crate::progress::inflight::Inflight; use crate::raft_state::LogStateReader; @@ -118,6 +119,10 @@ impl LogStateReader for LogState { } } + fn ref_log_id(&self, _index: u64) -> Option> { + todo!() + } + fn last_log_id(&self) -> Option<&LogIdOf> { self.last.as_ref() } diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 363137581..dbfc114d3 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -9,7 +9,6 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::log_id_range::LogIdRange; -use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -42,7 +41,7 @@ where C: RaftTypeConfig impl Copy for Inflight where C: RaftTypeConfig, - CommittedLeaderIdOf: Copy, + LogIdOf: Copy, { } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 4b59f4e2e..cd88e4790 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -2,6 +2,8 @@ use std::fmt; use crate::display_ext::DisplayInstantExt; use crate::engine::leader_log_ids::LeaderLogIds; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; use crate::progress::entry::ProgressEntry; use crate::progress::Progress; use crate::progress::VecProgress; @@ -12,7 +14,6 @@ use crate::type_config::TypeConfigExt; use crate::vote::committed::CommittedVote; use crate::vote::raft_vote::RaftVoteExt; use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Leading state data. @@ -162,27 +163,32 @@ where /// Assign log ids to the entries. /// /// This method update the `self.last_log_id`. - pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( - &mut self, - entries: impl IntoIterator, - ) { + pub(crate) fn assign_log_ids<'a, Ent, I>(&mut self, entries: I) + where + Ent: RaftEntry + 'a, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { debug_assert!(self.transfer_to.is_none(), "leader is disabled to propose new log"); + let it = entries.into_iter(); + let len = it.len(); + if len == 0 { + return; + } + let committed_leader_id = self.committed_vote.committed_leader_id(); - let first = LogIdOf::::new(committed_leader_id, self.last_log_id().next_index()); - let mut last = first.clone(); + let mut index = self.last_log_id().next_index(); - for entry in entries { - entry.set_log_id(&last); - tracing::debug!("assign log id: {}", last); - last.index += 1; + for entry in it { + entry.set_log_id(LogIdOf::::new(committed_leader_id.clone(), index)); + tracing::debug!("assign log id: {}", entry.ref_log_id()); + index += 1; } - if last.index() > first.index() { - last.index -= 1; - self.last_log_id = Some(last); - } + index -= 1; + self.last_log_id = Some(LogIdOf::::new(committed_leader_id.clone(), index)); } /// Get the last timestamp acknowledged by a quorum. diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index cb6969f25..1fc04b52b 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -1,5 +1,9 @@ +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; +use crate::log_id::option_ref_log_id_ext::OptionRefLogIdExt; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; use crate::type_config::alias::LogIdOf; -use crate::LogIdOptionExt; use crate::RaftTypeConfig; /// APIs to get significant log ids reflecting the raft state. @@ -20,26 +24,30 @@ where C: RaftTypeConfig /// Return if a log id exists. /// /// It assumes a committed log will always get positive return value, according to raft spec. - fn has_log_id(&self, log_id: &LogIdOf) -> bool { + fn has_log_id(&self, log_id: impl RaftLogId) -> bool { if log_id.index() < self.committed().next_index() { - debug_assert!(Some(log_id) <= self.committed()); + debug_assert!(Some(log_id.to_ref()) <= self.committed().to_ref()); return true; } // The local log id exists at the index and is same as the input. - if let Some(local) = self.get_log_id(log_id.index()) { - *log_id == local + if let Some(local) = self.ref_log_id(log_id.index()) { + log_id.to_ref() == local } else { false } } + fn get_log_id(&self, index: u64) -> Option> { + self.ref_log_id(index).to_log_id() + } + /// Get the log id at the specified index. /// /// It will return `last_purged_log_id` if index is at the last purged index. /// If the log at the specified index is smaller than `last_purged_log_id`, or greater than /// `last_log_id`, it returns None. - fn get_log_id(&self, index: u64) -> Option>; + fn ref_log_id(&self, index: u64) -> Option>; /// The last known log id in the store. /// diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 6236ed8f3..3984674d5 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -6,7 +6,7 @@ use validit::Validate; use crate::engine::LogIdList; use crate::error::ForwardToLeader; -use crate::log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; use crate::storage::SnapshotMeta; use crate::utime::Leased; use crate::LogIdOptionExt; @@ -35,6 +35,9 @@ pub use membership_state::MembershipState; pub(crate) use vote_state_reader::VoteStateReader; use crate::display_ext::DisplayOptionExt; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; +use crate::log_id::ref_log_id::RefLogId; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::type_config::alias::InstantOf; @@ -108,8 +111,8 @@ where C: RaftTypeConfig impl LogStateReader for RaftState where C: RaftTypeConfig { - fn get_log_id(&self, index: u64) -> Option> { - self.log_ids.get(index) + fn ref_log_id(&self, index: u64) -> Option> { + self.log_ids.ref_at(index) } fn last_log_id(&self) -> Option<&LogIdOf> { @@ -254,11 +257,20 @@ where C: RaftTypeConfig /// Append a list of `log_id`. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_ids: &[LID]) { + pub(crate) fn extend_log_ids_from_same_leader<'a, I>(&mut self, new_log_ids: I) + where + I: IntoIterator>, + ::IntoIter: DoubleEndedIterator, + { self.log_ids.extend_from_same_leader(new_log_ids) } - pub(crate) fn extend_log_ids<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { + pub(crate) fn extend_log_ids(&mut self, new_log_id: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { self.log_ids.extend(new_log_id) } @@ -299,16 +311,16 @@ where C: RaftTypeConfig /// Find the first entry in the input that does not exist on local raft-log, /// by comparing the log id. pub(crate) fn first_conflicting_index(&self, entries: &[Ent]) -> usize - where Ent: RaftLogId { + where Ent: RaftEntry { let l = entries.len(); for (i, ent) in entries.iter().enumerate() { - let log_id = ent.get_log_id(); + let ref_log_id = ent.ref_log_id(); - if !self.has_log_id(log_id) { + if !self.has_log_id(ref_log_id) { tracing::debug!( at = display(i), - entry_log_id = display(log_id), + entry_log_id = display(ref_log_id), "found nonexistent log id" ); return i; diff --git a/openraft/src/raft_state/tests/forward_to_leader_test.rs b/openraft/src/raft_state/tests/forward_to_leader_test.rs index cf5d76b47..0a3b5acf9 100644 --- a/openraft/src/raft_state/tests/forward_to_leader_test.rs +++ b/openraft/src/raft_state/tests/forward_to_leader_test.rs @@ -6,22 +6,29 @@ use maplit::btreeset; use crate::engine::testing::UTConfig; use crate::error::ForwardToLeader; -use crate::testing::log_id; +use crate::type_config::alias::LeaderIdOf; +use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::NodeIdOf; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::RaftLeaderIdExt; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; use crate::RaftState; use crate::Vote; -fn m12() -> Membership { +fn log_id(term: u64, node_id: NodeIdOf>, index: u64) -> LogIdOf> { + LogIdOf::>::new(LeaderIdOf::>::new_committed(term, node_id), index) +} + +fn m12() -> Membership> { Membership::new_with_defaults(vec![btreeset! {1,2}], []) } #[test] fn test_forward_to_leader_vote_not_committed() { - let rs = RaftState:: { + let rs = RaftState::> { vote: Leased::new(UTConfig::<()>::now(), Duration::from_millis(500), Vote::new(1, 2)), membership_state: MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), @@ -35,7 +42,7 @@ fn test_forward_to_leader_vote_not_committed() { #[test] fn test_forward_to_leader_not_a_member() { - let rs = RaftState:: { + let rs = RaftState::> { vote: Leased::new( UTConfig::<()>::now(), Duration::from_millis(500), diff --git a/openraft/src/raft_state/tests/log_state_reader_test.rs b/openraft/src/raft_state/tests/log_state_reader_test.rs index 7a239684f..e32b12c5d 100644 --- a/openraft/src/raft_state/tests/log_state_reader_test.rs +++ b/openraft/src/raft_state/tests/log_state_reader_test.rs @@ -1,12 +1,11 @@ use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::raft_state::LogStateReader; -use crate::testing; use crate::type_config::alias::LogIdOf; use crate::RaftState; fn log_id(term: u64, index: u64) -> LogIdOf { - testing::log_id(term, 0, index) + crate::engine::testing::log_id(term, 0, index) } #[test] @@ -44,7 +43,7 @@ fn test_raft_state_prev_log_id() -> anyhow::Result<()> { fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> { let rs = RaftState::::default(); - assert!(!rs.has_log_id(&log_id(0, 0))); + assert!(!rs.has_log_id(log_id(0, 0))); Ok(()) } @@ -56,9 +55,9 @@ fn test_raft_state_has_log_id_committed_gets_true() -> anyhow::Result<()> { ..Default::default() }; - assert!(rs.has_log_id(&log_id(0, 0))); - assert!(rs.has_log_id(&log_id(2, 1))); - assert!(!rs.has_log_id(&log_id(2, 2))); + assert!(rs.has_log_id(log_id(0, 0))); + assert!(rs.has_log_id(log_id(2, 1))); + assert!(!rs.has_log_id(log_id(2, 2))); Ok(()) } @@ -71,14 +70,14 @@ fn test_raft_state_has_log_id_in_log_id_list() -> anyhow::Result<()> { ..Default::default() }; - assert!(rs.has_log_id(&log_id(0, 0))); - assert!(rs.has_log_id(&log_id(2, 1))); - assert!(rs.has_log_id(&log_id(1, 3))); - assert!(rs.has_log_id(&log_id(3, 4))); + assert!(rs.has_log_id(log_id(0, 0))); + assert!(rs.has_log_id(log_id(2, 1))); + assert!(rs.has_log_id(log_id(1, 3))); + assert!(rs.has_log_id(log_id(3, 4))); - assert!(!rs.has_log_id(&log_id(2, 3))); - assert!(!rs.has_log_id(&log_id(2, 4))); - assert!(!rs.has_log_id(&log_id(3, 5))); + assert!(!rs.has_log_id(log_id(2, 3))); + assert!(!rs.has_log_id(log_id(2, 4))); + assert!(!rs.has_log_id(log_id(3, 5))); Ok(()) } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 70e680599..72c132f1f 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -26,6 +26,7 @@ use crate::core::notification::Notification; use crate::core::sm::handle::SnapshotReader; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; +use crate::entry::raft_entry_ext::RaftEntryExt; use crate::entry::RaftEntry; use crate::error::HigherVote; use crate::error::PayloadTooLarge; @@ -59,7 +60,6 @@ use crate::type_config::alias::VoteOf; use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; -use crate::RaftLogId; use crate::RaftNetworkFactory; use crate::RaftTypeConfig; use crate::StorageError; @@ -398,8 +398,8 @@ where // limited_get_log_entries will return logs smaller than the range [start, end). let logs = self.log_reader.limited_get_log_entries(start, end).await?; - let first = logs.first().map(|x| x.get_log_id()).unwrap(); - let last = logs.last().map(|x| x.log_id()).unwrap(); + let first = logs.first().map(|ent| ent.ref_log_id()).unwrap(); + let last = logs.last().map(|ent| ent.log_id()).unwrap(); debug_assert!( !logs.is_empty() && logs.len() <= (end - start) as usize, diff --git a/openraft/src/summary.rs b/openraft/src/summary.rs index 6263d1d85..34d662772 100644 --- a/openraft/src/summary.rs +++ b/openraft/src/summary.rs @@ -81,10 +81,9 @@ mod tests { #[test] fn test_display() { - use crate::engine::testing::UTConfig; use crate::MessageSummary; - let lid = crate::testing::log_id::(1, 2, 3); + let lid = crate::engine::testing::log_id(1, 2, 3); assert_eq!("T1-N2.3", lid.to_string()); assert_eq!("T1-N2.3", lid.summary()); assert_eq!("Some(T1-N2.3)", Some(&lid).summary()); diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs index d4bc35f71..e555710ee 100644 --- a/openraft/src/testing/common.rs +++ b/openraft/src/testing/common.rs @@ -5,7 +5,6 @@ use std::collections::BTreeSet; use crate::entry::RaftEntry; use crate::type_config::alias::LogIdOf; use crate::vote::RaftLeaderIdExt; -use crate::LogId; use crate::RaftTypeConfig; /// Builds a log id, for testing purposes. @@ -14,10 +13,7 @@ where C: RaftTypeConfig, C::Term: From, { - LogId:: { - leader_id: C::LeaderId::new_committed(term.into(), node_id), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), node_id), index) } /// Create a blank log entry for test. @@ -26,22 +22,18 @@ where C: RaftTypeConfig, C::Term: From, { - crate::Entry::::new_blank(log_id(term, node_id, index)) + crate::Entry::::new_blank(log_id::(term, node_id, index)) } /// Create a membership log entry without learner config for test. -pub fn membership_ent( - term: u64, - node_id: C::NodeId, - index: u64, - config: Vec>, -) -> crate::Entry +pub fn membership_ent(term: u64, node_id: C::NodeId, index: u64, config: Vec>) -> crate::Entry where + C: RaftTypeConfig, C::Term: From, C::Node: Default, { crate::Entry::new_membership( - log_id(term, node_id, index), + log_id::(term, node_id, index), crate::Membership::new_with_defaults(config, []), ) } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index 726ddad09..65b089be1 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -27,7 +27,6 @@ use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderIdExt; -use crate::LogId; use crate::Membership; use crate::OptionalSend; use crate::RaftLogReader; @@ -491,13 +490,13 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(3, 2)), + Some(&log_id_0::(3, 2)), initial.last_log_id(), "state machine has higher log" ); assert_eq!( initial.committed(), - Some(&log_id_0(3, 1)), + Some(&log_id_0::(3, 1)), "unexpected value for last applied log" ); assert_eq!( @@ -580,7 +579,7 @@ where ]) .await?; - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; append(&mut store, [membership_ent_0::(1, 3, btreeset! {1,2,3})]).await?; let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; @@ -604,7 +603,7 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(2, 1)), + Some(&log_id_0::(2, 1)), initial.last_log_id(), "state machine has higher log" ); @@ -621,23 +620,20 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(3, 1)), + Some(&log_id_0::(3, 1)), initial.last_log_id(), "state machine has higher log" ); assert_eq!( initial.last_purged_log_id().cloned(), - Some(log_id_0(3, 1)), + Some(log_id_0::(3, 1)), "state machine has higher log" ); Ok(()) } pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - let log_id = |t: u64, n: u64, i| LogId:: { - leader_id: C::LeaderId::new_committed(t.into(), n.into()), - index: i, - }; + let log_id = |t: u64, n: u64, i| LogIdOf::::new(C::LeaderId::new_committed(t.into(), n.into()), i); tracing::info!("--- empty store, expect []"); { @@ -798,11 +794,11 @@ where blank_ent_0::(1, 5), ]) .await?; - store.purge(log_id_0(1, 1)).await?; + store.purge(log_id_0::(1, 1)).await?; apply(&mut sm, [blank_ent_0::(1, 2)]).await?; - store.save_committed(Some(log_id_0(1, 4))).await?; + store.save_committed(Some(log_id_0::(1, 4))).await?; let got = store.read_committed().await?; if got.is_none() { tracing::info!("This implementation does not store committed log id, skip test re-applying committed logs"); @@ -811,9 +807,13 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; - assert_eq!(Some(&log_id_0(1, 4)), initial.io_applied(), "last_applied is updated"); assert_eq!( - Some(log_id_0(1, 4)), + Some(&log_id_0::(1, 4)), + initial.io_applied(), + "last_applied is updated" + ); + assert_eq!( + Some(log_id_0::(1, 4)), sm.applied_state().await?.0, "last_applied is updated" ); @@ -844,8 +844,8 @@ where let logs = store.try_get_log_entries(5..7).await?; assert_eq!(logs.len(), 2); - assert_eq!(logs[0].log_id(), log_id_0(1, 5)); - assert_eq!(logs[1].log_id(), log_id_0(1, 6)); + assert_eq!(logs[0].log_id(), log_id_0::(1, 5)); + assert_eq!(logs[1].log_id(), log_id_0::(1, 6)); } Ok(()) @@ -866,7 +866,7 @@ where assert!(!logs.is_empty()); assert!(logs.len() <= 2); - assert_eq!(logs[0].log_id(), log_id_0(1, 5)); + assert_eq!(logs[0].log_id(), log_id_0::(1, 5)); } Ok(()) @@ -875,14 +875,14 @@ where pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id(0, 0, 0)).await?; + store.purge(log_id::(0, 0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let ent = store.try_get_log_entry(3).await?; - assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.log_id())); + assert_eq!(Some(log_id_0::(1, 3)), ent.map(|x| x.log_id())); let ent = store.try_get_log_entry(0).await?; assert_eq!(None, ent.map(|x| x.log_id())); @@ -917,38 +917,38 @@ where let st = store.get_log_state().await?; assert_eq!(None, st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete log 0-0"); { - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; let st = store.get_log_state().await?; - assert_eq!(Some(log_id(0, 0, 0)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id::(0, 0, 0)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete all log"); { - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; let st = store.get_log_state().await?; - assert_eq!(Some(log_id_0(1, 2)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete advance last present logs"); { - store.purge(log_id_0(2, 3)).await?; + store.purge(log_id_0::(2, 3)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let st = store.get_log_state().await?; - assert_eq!(Some(log_id_0(2, 3)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(2, 3)), st.last_log_id); + assert_eq!(Some(log_id_0::(2, 3)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(2, 3)), st.last_log_id); } Ok(()) @@ -957,7 +957,7 @@ where pub async fn get_log_id(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 3)).await?; + store.purge(log_id_0::(1, 3)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -970,10 +970,10 @@ where assert!(res.is_err()); let res = store.get_log_id(4).await?; - assert_eq!(log_id_0(1, 4), res); + assert_eq!(log_id_0::(1, 4), res); let res = store.get_log_id(10).await?; - assert_eq!(log_id_0(1, 10), res); + assert_eq!(log_id_0::(1, 10), res); let res = store.get_log_id(11).await; assert!(res.is_err()); @@ -995,26 +995,26 @@ where .await?; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } tracing::info!("--- last id in logs < last applied id in sm, only return the id in logs"); { apply(&mut sm, [blank_ent_0::(1, 3)]).await?; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } tracing::info!("--- no logs, return default"); { - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } Ok(()) @@ -1030,10 +1030,10 @@ where apply(&mut sm, [membership_ent_0::(1, 3, btreeset! {1,2})]).await?; let (applied, mem) = sm.applied_state().await?; - assert_eq!(Some(log_id_0(1, 3)), applied); + assert_eq!(Some(log_id_0::(1, 3)), applied); assert_eq!( StoredMembership::new( - Some(log_id_0(1, 3)), + Some(log_id_0::(1, 3)), Membership::new_with_defaults(vec![btreeset! {1,2}], []) ), mem @@ -1045,10 +1045,10 @@ where apply(&mut sm, [blank_ent_0::(1, 5)]).await?; let (applied, mem) = sm.applied_state().await?; - assert_eq!(Some(log_id_0(1, 5)), applied); + assert_eq!(Some(log_id_0::(1, 5)), applied); assert_eq!( StoredMembership::new( - Some(log_id_0(1, 3)), + Some(log_id_0::(1, 3)), Membership::new_with_defaults(vec![btreeset! {1,2}], []) ), mem @@ -1063,7 +1063,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1075,8 +1075,8 @@ where assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(0, 0)), - last_log_id: Some(log_id_0(1, 10)), + last_purged_log_id: Some(log_id_0::(0, 0)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1088,7 +1088,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 5)).await?; + store.purge(log_id_0::(1, 5)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1100,8 +1100,8 @@ where assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(1, 5)), - last_log_id: Some(log_id_0(1, 10)), + last_purged_log_id: Some(log_id_0::(1, 5)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1113,7 +1113,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 20)).await?; + store.purge(log_id_0::(1, 20)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1124,8 +1124,8 @@ where assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(1, 20)), - last_log_id: Some(log_id_0(1, 20)), + last_purged_log_id: Some(log_id_0::(1, 20)), + last_log_id: Some(log_id_0::(1, 20)), }, store.get_log_state().await? ); @@ -1137,7 +1137,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.truncate(log_id_0(1, 11)).await?; + store.truncate(log_id_0::(1, 11)).await?; let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 11); @@ -1145,7 +1145,7 @@ where assert_eq!( LogState { last_purged_log_id: None, - last_log_id: Some(log_id_0(1, 10)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1157,7 +1157,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.truncate(log_id_0(0, 0)).await?; + store.truncate(log_id_0::(0, 0)).await?; let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 0); @@ -1176,7 +1176,7 @@ where pub async fn append_to_log(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1188,7 +1188,7 @@ where let last = store.try_get_log_entries(0..).await?.into_iter().last().unwrap(); assert_eq!(l, 11, "expected 11 entries to exist in the log"); - assert_eq!(last.log_id(), log_id_0(2, 11), "unexpected log id"); + assert_eq!(last.log_id(), log_id_0::(2, 11), "unexpected log id"); Ok(()) } @@ -1200,8 +1200,8 @@ where let mut b = sm.get_snapshot_builder().await; let snap = b.build_snapshot().await?; let meta = snap.meta; - assert_eq!(Some(log_id_0(0, 0)), meta.last_log_id); - assert_eq!(&Some(log_id_0(0, 0)), meta.last_membership.log_id()); + assert_eq!(Some(log_id_0::(0, 0)), meta.last_log_id); + assert_eq!(&Some(log_id_0::(0, 0)), meta.last_membership.log_id()); assert_eq!( &Membership::new_with_defaults(vec![btreeset! {1,2}], []), meta.last_membership.membership() @@ -1219,8 +1219,8 @@ where let mut b = sm.get_snapshot_builder().await; let snap = b.build_snapshot().await?; let meta = snap.meta; - assert_eq!(Some(log_id_0(2, 2)), meta.last_log_id); - assert_eq!(&Some(log_id_0(2, 2)), meta.last_membership.log_id()); + assert_eq!(Some(log_id_0::(2, 2)), meta.last_log_id); + assert_eq!(&Some(log_id_0::(2, 2)), meta.last_membership.log_id()); assert_eq!( &Membership::new_with_defaults(vec![btreeset! {3,4}], []), meta.last_membership.membership() @@ -1242,7 +1242,7 @@ where assert_eq!(replies.len(), 1, "expected 1 response"); let (last_applied, _) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(0, 0)),); + assert_eq!(last_applied, Some(log_id_0::(0, 0)),); } tracing::info!("--- apply membership entry"); @@ -1253,7 +1253,7 @@ where assert_eq!(replies.len(), 1, "expected 1 response"); let (last_applied, mem) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(1, 1)),); + assert_eq!(last_applied, Some(log_id_0::(1, 1)),); assert_eq!( mem.membership(), &Membership::new_with_defaults(vec![btreeset! {1,2}], []) @@ -1265,7 +1265,7 @@ where // { // let entry = { // let mut e = C::Entry::from_app_data(C::D::from(1)); - // e.set_log_id(&log_id_0(2, 2)); + // e.set_log_id(&log_id_0::(2, 2)); // e // }; // @@ -1273,7 +1273,7 @@ where // assert_eq!(replies.len(), 1, "expected 1 response"); // let (last_applied, _) = sm.applied_state().await?; // - // assert_eq!(last_applied, Some(log_id_0(2, 2)),); + // assert_eq!(last_applied, Some(log_id_0::(2, 2)),); // } Ok(()) @@ -1289,7 +1289,7 @@ where assert_eq!(replies.len(), 2); let (last_applied, mem) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(1, 1)),); + assert_eq!(last_applied, Some(log_id_0::(1, 1)),); assert_eq!( mem.membership(), &Membership::new_with_defaults(vec![btreeset! {1,2}], []) @@ -1313,9 +1313,9 @@ where // Add a few entries so we have state to snapshot let snapshot_entries = vec![membership_ent_0::(1, 2, btreeset! {1, 2, 3}), blank_ent_0::(3, 3)]; sm_l.apply(snapshot_entries).await?; - let snapshot_last_log_id = Some(log_id_0(3, 3)); + let snapshot_last_log_id = Some(log_id_0::(3, 3)); let snapshot_last_membership = StoredMembership::new( - Some(log_id_0(1, 2)), + Some(log_id_0::(1, 2)), Membership::new_with_defaults(vec![btreeset![1, 2, 3]], []), ); let snapshot_applied_state = (snapshot_last_log_id.clone(), snapshot_last_membership.clone()); @@ -1379,10 +1379,7 @@ where C: RaftTypeConfig, C::NodeId: From, { - LogId { - leader_id: C::LeaderId::new_committed(term.into(), NODE_ID.into()), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), NODE_ID.into()), index) } /// Create a blank log entry with node_id 0 for test. @@ -1391,7 +1388,7 @@ where C::Term: From, C::NodeId: From, { - C::Entry::new_blank(log_id(term, 0, index)) + C::Entry::new_blank(log_id::(term, 0, index)) } /// Create a membership entry with node_id 0 for test. @@ -1401,7 +1398,7 @@ where C::NodeId: From, C::Node: Default, { - C::Entry::new_membership(log_id_0(term, index), Membership::new_with_defaults(vec![bs], [])) + C::Entry::new_membership(log_id_0::(term, index), Membership::new_with_defaults(vec![bs], [])) } /// Build a `RaftLogStorage` and `RaftStateMachine` implementation and run a test on it. @@ -1463,8 +1460,5 @@ where C::Term: From, C::NodeId: From, { - LogId { - leader_id: C::LeaderId::new_committed(term.into(), node_id.into()), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), node_id.into()), index) } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 31846fe8f..1d37d1bbd 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -13,7 +13,6 @@ pub use async_runtime::MpscUnbounded; pub use async_runtime::OneshotSender; pub use util::TypeConfigExt; -use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; use crate::vote::raft_vote::RaftVote; @@ -89,7 +88,7 @@ pub trait RaftTypeConfig: type Vote: RaftVote; /// Raft log entry, which can be built from an AppData. - type Entry: RaftEntry + FromAppData; + type Entry: RaftEntry; /// Snapshot data for exposing a snapshot for reading & writing. /// diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 098a9a8db..f02ef7319 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -393,7 +393,7 @@ impl RaftLogStorage for Arc { let mut log = self.log.write().await; for entry in entries { let s = serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(entry.log_id(), &e))?; - log.insert(entry.log_id.index(), s); + log.insert(entry.log_id.index, s); } callback.io_completed(Ok(()));