From 3df43b6c23dd505f6dc44d51b6a4cd96ad31dccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 27 Jan 2025 17:19:40 +0800 Subject: [PATCH] Refactor: Introduce `RefLogId` as a reference to a log ID Existing `LogIdOf` provides a minimal storage implementation for a log ID with essential properties. In contrast, `RefLogId` offers the same information as `LogIdOf` while adding additional system-defined properties. For example, in the future, `LogIdOf` defined by the application will not need to implement `Ord`. However, `RefLogId`, used internally, will provide a system-defined `Ord` implementation. This change updates internal components to return `RefLogId` or accept it as an argument where possible, enabling more flexibility and consistency in handling log IDs. Change: refine the `RaftEntry` trait - The `RaftEntry` trait now requires `AsRef>` and `AsMut>`, providing a more standard API for accessing the log ID of a log entry. As a result, the `RaftEntry: RaftLogId` requirement is no longer needed. - A new method, `new_normal()`, has been added to the `RaftEntry` trait to replace the `FromAppData` trait. - Additional utility methods for working with entries are now provided in the `RaftEntryExt` trait. - Part of #1278 Upgrade tips: 1. **For applications using a custom `RaftEntry` implementation** (e.g., declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`): - Update the `RaftEntry` implementation for your custom entry type (`MyEntry`) by adding the `new_normal()` method. - Implement `AsRef` and `AsMut` for your custom entry type. 2. **For applications using the default `Entry` provided by OpenRaft**: - No changes are required. empty: try to remove log id from RaftTypeConfig --- cluster_benchmark/tests/benchmark/store.rs | 6 +- examples/memstore/src/log_store.rs | 6 +- examples/raft-kv-memstore-grpc/build.rs | 17 ++ .../proto/api_service.proto | 6 +- .../proto/internal_service.proto | 77 ++++++++- .../proto/management_service.proto | 6 +- .../src/grpc/internal_service.rs | 23 +-- examples/raft-kv-memstore-grpc/src/lib.rs | 68 ++++++++ .../raft-kv-memstore-grpc/src/network/mod.rs | 13 +- .../src/pb_impl/impl_entry.rs | 50 ++++++ .../src/pb_impl/impl_membership.rs | 35 ++++ .../raft-kv-memstore-grpc/src/pb_impl/mod.rs | 2 + .../raft-kv-memstore-grpc/src/store/mod.rs | 32 ++-- examples/rocksstore/src/lib.rs | 4 +- examples/rocksstore/src/log_store.rs | 10 +- examples/utils/declare_types.rs | 2 +- openraft/src/base/finalized.rs | 38 +++++ openraft/src/base/mod.rs | 2 + openraft/src/core/raft_core.rs | 26 +-- openraft/src/core/sm/worker.rs | 8 +- openraft/src/engine/engine_impl.rs | 3 +- .../engine/handler/following_handler/mod.rs | 24 +-- .../src/engine/handler/leader_handler/mod.rs | 7 +- .../src/engine/handler/log_handler/mod.rs | 5 +- .../engine/handler/snapshot_handler/mod.rs | 2 +- .../src/engine/handler/vote_handler/mod.rs | 3 +- openraft/src/engine/log_id_list.rs | 114 ++++++++----- openraft/src/engine/testing.rs | 2 +- openraft/src/engine/tests/log_id_list_test.rs | 14 +- openraft/src/entry/mod.rs | 45 +---- openraft/src/entry/payload.rs | 4 - openraft/src/entry/raft_entry_ext.rs | 20 +++ openraft/src/entry/traits.rs | 76 +++++++-- openraft/src/impls/mod.rs | 2 + openraft/src/lib.rs | 1 - openraft/src/log_id/log_id_option_ext.rs | 27 +-- openraft/src/log_id/mod.rs | 20 ++- openraft/src/log_id/option_raft_log_id_ext.rs | 51 ++++++ openraft/src/log_id/option_ref_log_id_ext.rs | 20 +++ openraft/src/log_id/raft_log_id.rs | 43 +++-- openraft/src/log_id/raft_log_id_ext.rs | 32 ++++ openraft/src/log_id/ref_log_id.rs | 60 +++++++ openraft/src/log_id_range.rs | 6 +- .../src/membership/effective_membership.rs | 5 +- openraft/src/progress/entry/mod.rs | 12 +- openraft/src/progress/entry/tests.rs | 5 + openraft/src/progress/entry/update.rs | 2 +- openraft/src/progress/inflight/mod.rs | 3 +- openraft/src/proposer/leader.rs | 49 +++--- openraft/src/raft_state/io_state/log_io_id.rs | 2 + openraft/src/raft_state/log_state_reader.rs | 20 ++- openraft/src/raft_state/mod.rs | 30 +++- .../tests/forward_to_leader_test.rs | 15 +- .../raft_state/tests/log_state_reader_test.rs | 25 ++- .../src/raft_state/tests/validate_test.rs | 3 +- openraft/src/replication/mod.rs | 7 +- openraft/src/replication/response.rs | 4 +- openraft/src/storage/helper.rs | 8 +- openraft/src/storage/log_reader_ext.rs | 4 +- .../src/storage/v2/raft_log_storage_ext.rs | 4 +- openraft/src/testing/common.rs | 18 +- openraft/src/testing/log/suite.rs | 157 +++++++++--------- openraft/src/type_config.rs | 3 +- stores/memstore/src/lib.rs | 9 +- tests/tests/fixtures/mod.rs | 4 +- 65 files changed, 958 insertions(+), 443 deletions(-) create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs create mode 100644 openraft/src/base/finalized.rs create mode 100644 openraft/src/entry/raft_entry_ext.rs create mode 100644 openraft/src/log_id/option_raft_log_id_ext.rs create mode 100644 openraft/src/log_id/option_ref_log_id_ext.rs create mode 100644 openraft/src/log_id/raft_log_id_ext.rs create mode 100644 openraft/src/log_id/ref_log_id.rs diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index 9ea960488..d5962de69 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use openraft::alias::LogIdOf; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntry; use openraft::storage::IOFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogReader; @@ -20,7 +21,6 @@ use openraft::storage::Snapshot; use openraft::Entry; use openraft::EntryPayload; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; @@ -188,7 +188,7 @@ impl RaftLogStorage for Arc { let last = match last_serialized { None => None, - Some(ent) => Some(*ent.get_log_id()), + Some(ent) => Some(ent.log_id()), }; let last_purged = self.last_purged_log_id.read().await.clone(); @@ -237,7 +237,7 @@ impl RaftLogStorage for Arc { where I: IntoIterator> + Send { { let mut log = self.log.write().await; - log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index(), entry))); + log.extend(entries.into_iter().map(|entry| (entry.index(), entry))); } callback.io_completed(Ok(())); Ok(()) diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 05e548ec8..553fd28d2 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -8,9 +8,9 @@ use std::sync::Arc; use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntry; use openraft::storage::IOFlushed; use openraft::LogState; -use openraft::RaftLogId; use openraft::RaftTypeConfig; use openraft::StorageError; use tokio::sync::Mutex; @@ -60,7 +60,7 @@ impl LogStoreInner { } async fn get_log_state(&mut self) -> Result, StorageError> { - let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone()); + let last = self.log.iter().next_back().map(|(_, ent)| ent.log_id()); let last_purged = self.last_purged_log_id.clone(); @@ -97,7 +97,7 @@ impl LogStoreInner { where I: IntoIterator { // Simple implementation that calls the flush-before-return `append_to_log`. for entry in entries { - self.log.insert(entry.get_log_id().index(), entry); + self.log.insert(entry.index(), entry); } callback.io_completed(Ok(())); diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs index 36a5cdbc1..6db896526 100644 --- a/examples/raft-kv-memstore-grpc/build.rs +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -11,6 +11,7 @@ fn main() -> Result<(), Box> { // TODO: remove serde tonic_build::configure() + .btree_map(["."]) .type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") .type_attribute( "openraftpb.SetRequest", @@ -25,6 +26,22 @@ fn main() -> Result<(), Box> { "#[derive(Eq, serde::Serialize, serde::Deserialize)]", ) .type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") + .type_attribute( + "google.protobuf.Empty", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.NodeIdSet", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.Membership", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.Entry", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) .compile_protos_with_config(config, &proto_files, &["proto"])?; Ok(()) } diff --git a/examples/raft-kv-memstore-grpc/proto/api_service.proto b/examples/raft-kv-memstore-grpc/proto/api_service.proto index e549473fe..88f97f3ff 100644 --- a/examples/raft-kv-memstore-grpc/proto/api_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/api_service.proto @@ -1,4 +1,5 @@ syntax = "proto3"; +import "internal_service.proto"; package openraftpb; // ApiService provides the key-value store API operations @@ -20,8 +21,3 @@ message Response { optional string value = 1; // Retrieved value } -// SetRequest represents a key-value pair to be stored -message SetRequest { - string key = 1; // Key to store - string value = 2; // Value to associate with the key -} diff --git a/examples/raft-kv-memstore-grpc/proto/internal_service.proto b/examples/raft-kv-memstore-grpc/proto/internal_service.proto index 9414a7c16..e605a589f 100644 --- a/examples/raft-kv-memstore-grpc/proto/internal_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/internal_service.proto @@ -1,6 +1,19 @@ syntax = "proto3"; +import "google/protobuf/empty.proto"; package openraftpb; +// SetRequest represents a key-value pair to be stored +message SetRequest { + string key = 1; // Key to store + string value = 2; // Value to associate with the key +} + +// Node represents a single node in the Raft cluster +message Node { + string rpc_addr = 1; // RPC address for node communication + uint64 node_id = 2; // Unique identifier for the node +} + // LeaderId represents the leader identifier in Raft message LeaderId { uint64 term = 1; @@ -13,6 +26,33 @@ message Vote { bool committed = 2; } +message Entry { + uint64 term = 1; + uint64 index = 2; + + // Optional Application data + SetRequest app_data = 12; + + // Optional Membership config + Membership membership = 13; +} + + +// NodeIds is a set of NodeIds +message NodeIdSet { + map node_ids = 1; +} + +// Membership config +message Membership { + // Joint(includes more than one NodeIdSet) or uniform(one NodeIdSet) config. + repeated NodeIdSet configs = 1; + + // All of the nodes in the cluster, including voters and learners. + // A node id that is included in `configs` is a voter, otherwise it is a learner. + map nodes = 2; +} + // LogId represents the log identifier in Raft message LogId { uint64 term = 1; @@ -32,23 +72,48 @@ message VoteResponse { LogId last_log_id = 3; } +message AppendEntriesRequest { + // The leader's vote, used to identify the leader, and must be committed + Vote vote = 1; + + // The previous log id the leader has sent to the follower + LogId prev_log_id = 2; + + // The entries to be appended to the follower's log + repeated Entry entries = 3; + + // The leader's last committed log id + LogId leader_commit = 4; +} + +message AppendEntriesResponse { + // If not None, the follower rejected the AppendEntries request due to having a higher vote. + // All other fields are valid only when this field is None + Vote rejected_by = 1; + + // The follower accepts this AppendEntries request's vote, but the prev_log_id conflicts with + // the follower's log. The leader should retry with a smaller prev_log_id that matches the + // follower's log. All subsequent fields are valid only when this field is false + bool conflict = 2; + + // The last log id the follower accepted from this request. + // If None, all input entries were accepted and persisted. + // Otherwise, only entries up to and including this id were accepted + LogId last_log_id = 3; +} + // InternalService handles internal Raft cluster communication service InternalService { // Vote handles vote requests between Raft nodes during leader election rpc Vote(VoteRequest) returns (VoteResponse) {} // AppendEntries handles call related to append entries RPC - rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {} + rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {} // Snapshot handles install snapshot RPC rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {} } -// RaftRequestBytes encapsulates binary Raft request data -message RaftRequestBytes { - bytes value = 1; // Serialized Raft request data -} - // RaftReplyBytes encapsulates binary Raft response data message RaftReplyBytes { bytes value = 1; // Serialized Raft response data diff --git a/examples/raft-kv-memstore-grpc/proto/management_service.proto b/examples/raft-kv-memstore-grpc/proto/management_service.proto index a612cdcd9..891108d91 100644 --- a/examples/raft-kv-memstore-grpc/proto/management_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/management_service.proto @@ -1,4 +1,5 @@ syntax = "proto3"; +import "internal_service.proto"; package openraftpb; // ManagementService handles Raft cluster management operations @@ -21,11 +22,6 @@ message InitRequest { repeated Node nodes = 1; // List of initial cluster nodes } -// Node represents a single node in the Raft cluster -message Node { - string rpc_addr = 1; // RPC address for node communication - uint64 node_id = 2; // Unique identifier for the node -} // AddLearnerRequest specifies parameters for adding a learner node message AddLearnerRequest { diff --git a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs index 84c2af345..d18d74d5e 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs @@ -8,9 +8,9 @@ use tonic::Status; use tonic::Streaming; use tracing::debug; +use crate::protobuf as pb; use crate::protobuf::internal_service_server::InternalService; use crate::protobuf::RaftReplyBytes; -use crate::protobuf::RaftRequestBytes; use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest; use crate::protobuf::VoteResponse; @@ -75,15 +75,10 @@ impl InternalService for InternalServiceImpl { /// Nodes vote for candidates based on log completeness and term numbers. async fn vote(&self, request: Request) -> Result, Status> { debug!("Processing vote request"); - let req = request.into_inner(); - // Deserialize the vote request - let vote_req = req.into(); - - // Process the vote request let vote_resp = self .raft_node - .vote(vote_req) + .vote(request.into_inner().into()) .await .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?; @@ -103,22 +98,20 @@ impl InternalService for InternalServiceImpl { /// # Protocol Details /// This implements the AppendEntries RPC from the Raft protocol. /// Used for both log replication and as heartbeat mechanism. - async fn append_entries(&self, request: Request) -> Result, Status> { + async fn append_entries( + &self, + request: Request, + ) -> Result, Status> { debug!("Processing append entries request"); - let req = request.into_inner(); - - // Deserialize the append request - let append_req = Self::deserialize_request(&req.value)?; - // Process the append request let append_resp = self .raft_node - .append_entries(append_req) + .append_entries(request.into_inner().into()) .await .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?; debug!("Append entries request processed successfully"); - Self::create_response(append_resp) + Ok(Response::new(append_resp.into())) } /// Handles snapshot installation requests for state transfer using streaming. diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index 3e0a5e58d..d4013db16 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -21,6 +21,7 @@ openraft::declare_raft_types!( R = pb::Response, LeaderId = pb::LeaderId, Vote = pb::Vote, + Entry = pb::Entry, Node = pb::Node, SnapshotData = StateMachineData, ); @@ -57,6 +58,73 @@ impl From for VoteResponse { } } +impl From for AppendEntriesRequest { + fn from(proto_req: pb::AppendEntriesRequest) -> Self { + AppendEntriesRequest { + vote: proto_req.vote.unwrap(), + prev_log_id: proto_req.prev_log_id.map(|log_id| log_id.into()), + entries: proto_req.entries, + leader_commit: proto_req.leader_commit.map(|log_id| log_id.into()), + } + } +} + +impl From for pb::AppendEntriesRequest { + fn from(value: AppendEntriesRequest) -> Self { + pb::AppendEntriesRequest { + vote: Some(value.vote), + prev_log_id: value.prev_log_id.map(|log_id| log_id.into()), + entries: value.entries, + leader_commit: value.leader_commit.map(|log_id| log_id.into()), + } + } +} + +impl From for AppendEntriesResponse { + fn from(r: pb::AppendEntriesResponse) -> Self { + if let Some(higher) = r.rejected_by { + return AppendEntriesResponse::HigherVote(higher); + } + + if r.conflict { + return AppendEntriesResponse::Conflict; + } + + if let Some(log_id) = r.last_log_id { + AppendEntriesResponse::PartialSuccess(Some(log_id.into())) + } else { + AppendEntriesResponse::Success + } + } +} + +impl From for pb::AppendEntriesResponse { + fn from(r: AppendEntriesResponse) -> Self { + match r { + AppendEntriesResponse::Success => pb::AppendEntriesResponse { + rejected_by: None, + conflict: false, + last_log_id: None, + }, + AppendEntriesResponse::PartialSuccess(p) => pb::AppendEntriesResponse { + rejected_by: None, + conflict: false, + last_log_id: p.map(|log_id| log_id.into()), + }, + AppendEntriesResponse::Conflict => pb::AppendEntriesResponse { + rejected_by: None, + conflict: true, + last_log_id: None, + }, + AppendEntriesResponse::HigherVote(v) => pb::AppendEntriesResponse { + rejected_by: Some(v), + conflict: false, + last_log_id: None, + }, + } + } +} + impl From for pb::LogId { fn from(log_id: LogId) -> Self { pb::LogId { diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs index b8798951c..88a1d4f84 100644 --- a/examples/raft-kv-memstore-grpc/src/network/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -9,7 +9,6 @@ use tonic::transport::Channel; use crate::protobuf as pb; use crate::protobuf::internal_service_client::InternalServiceClient; -use crate::protobuf::RaftRequestBytes; use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest as PbVoteRequest; use crate::protobuf::VoteResponse as PbVoteResponse; @@ -65,12 +64,12 @@ impl RaftNetworkV2 for NetworkConnection { }; let mut client = InternalServiceClient::new(channel); - let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - let request = RaftRequestBytes { value }; - let response = client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - let message = response.into_inner(); - let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - Ok(result) + let response = client + .append_entries(pb::AppendEntriesRequest::from(req)) + .await + .map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let response = response.into_inner(); + Ok(AppendEntriesResponse::from(response)) } async fn full_snapshot( diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs new file mode 100644 index 000000000..303f43cc7 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs @@ -0,0 +1,50 @@ +use std::fmt; + +use openraft::alias::LogIdOf; +use openraft::entry::RaftEntry; +use openraft::entry::RaftPayload; +use openraft::EntryPayload; +use openraft::Membership; + +use crate::protobuf as pb; +use crate::TypeConfig; + +impl fmt::Display for pb::Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Entry{{term={},index={}}}", self.term, self.index) + } +} + +impl RaftPayload for pb::Entry { + fn get_membership(&self) -> Option> { + self.membership.clone().map(Into::into) + } +} + +impl RaftEntry for pb::Entry { + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self { + let mut app_data = None; + let mut membership = None; + match payload { + EntryPayload::Blank => {} + EntryPayload::Normal(data) => app_data = Some(data), + EntryPayload::Membership(m) => membership = Some(m.into()), + } + + Self { + term: log_id.leader_id, + index: log_id.index, + app_data, + membership, + } + } + + fn log_id_parts(&self) -> (&u64, u64) { + (&self.term, self.index) + } + + fn set_log_id(&mut self, new: LogIdOf) { + self.term = new.leader_id; + self.index = new.index; + } +} diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs new file mode 100644 index 000000000..a4567cbb8 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_membership.rs @@ -0,0 +1,35 @@ +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::Membership; + +use crate::pb; +use crate::TypeConfig; + +impl From for Membership { + fn from(value: pb::Membership) -> Self { + let mut configs = vec![]; + for c in value.configs { + let config: BTreeSet = c.node_ids.keys().copied().collect(); + configs.push(config); + } + let nodes = value.nodes; + // TODO: do not unwrap() + Membership::new(configs, nodes).unwrap() + } +} + +impl From> for pb::Membership { + fn from(value: Membership) -> Self { + let mut configs = vec![]; + for c in value.get_joint_config() { + let mut node_ids = BTreeMap::new(); + for nid in c.iter() { + node_ids.insert(*nid, ()); + } + configs.push(pb::NodeIdSet { node_ids }); + } + let nodes = value.nodes().map(|(nid, n)| (*nid, n.clone())).collect(); + pb::Membership { configs, nodes } + } +} diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs index 215d5a347..c9fba7a4d 100644 --- a/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs @@ -1,4 +1,6 @@ //! Implements traits for protobuf types +mod impl_entry; mod impl_leader_id; +mod impl_membership; mod impl_vote; diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs index 5e00e6153..0597c2112 100644 --- a/examples/raft-kv-memstore-grpc/src/store/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use std::sync::Mutex; use bincode; +use openraft::entry::RaftEntry; use openraft::storage::RaftStateMachine; -use openraft::EntryPayload; use openraft::RaftSnapshotBuilder; use serde::Deserialize; use serde::Serialize; @@ -129,21 +129,23 @@ impl RaftStateMachine for Arc { let mut sm = self.state_machine.lock().unwrap(); for entry in entries { - tracing::debug!(%entry.log_id, "replicate to sm"); - - sm.last_applied = Some(entry.log_id); - - match entry.payload { - EntryPayload::Blank => res.push(Response { value: None }), - EntryPayload::Normal(req) => { - sm.data.insert(req.key, req.value.clone()); - res.push(Response { value: Some(req.value) }); - } - EntryPayload::Membership(ref mem) => { - sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); - res.push(Response { value: None }) - } + let log_id = entry.log_id(); + + tracing::debug!("replicate to sm: {}", log_id); + + sm.last_applied = Some(log_id); + + let value = if let Some(req) = entry.app_data { + sm.data.insert(req.key, req.value.clone()); + Some(req.value) + } else if let Some(mem) = entry.membership { + sm.last_membership = StoredMembership::new(Some(log_id), mem.into()); + None + } else { + None }; + + res.push(Response { value }); } Ok(res) } diff --git a/examples/rocksstore/src/lib.rs b/examples/rocksstore/src/lib.rs index 5287950ed..20697491a 100644 --- a/examples/rocksstore/src/lib.rs +++ b/examples/rocksstore/src/lib.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use log_store::RocksLogStore; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntry; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; -use openraft::RaftLogId; use openraft::RaftSnapshotBuilder; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; @@ -179,7 +179,7 @@ impl RaftStateMachine for RocksStateMachine { for entry in entries_iter { tracing::debug!(%entry.log_id, "replicate to sm"); - sm.last_applied_log = Some(*entry.get_log_id()); + sm.last_applied_log = Some(entry.log_id()); match entry.payload { EntryPayload::Blank => res.push(RocksResponse { value: None }), diff --git a/examples/rocksstore/src/log_store.rs b/examples/rocksstore/src/log_store.rs index 4d7b5ea95..a1ff9b319 100644 --- a/examples/rocksstore/src/log_store.rs +++ b/examples/rocksstore/src/log_store.rs @@ -11,11 +11,11 @@ use meta::StoreMeta; use openraft::alias::EntryOf; use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntry; use openraft::storage::IOFlushed; use openraft::storage::RaftLogStorage; use openraft::LogState; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftTypeConfig; use openraft::StorageError; @@ -103,7 +103,7 @@ where C: RaftTypeConfig let entry: EntryOf = serde_json::from_slice(&val).map_err(read_logs_err)?; - assert_eq!(id, entry.get_log_id().index()); + assert_eq!(id, entry.index()); res.push(entry); } @@ -128,7 +128,7 @@ where C: RaftTypeConfig Some(res) => { let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - Some(ent.get_log_id().clone()) + Some(ent.log_id()) } }; @@ -158,8 +158,8 @@ where C: RaftTypeConfig async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), StorageError> where I: IntoIterator> + Send { for entry in entries { - let id = id_to_bin(entry.get_log_id().index()); - assert_eq!(bin_to_id(&id), entry.get_log_id().index()); + let id = id_to_bin(entry.index()); + assert_eq!(bin_to_id(&id), entry.index()); self.db .put_cf( self.cf_logs(), diff --git a/examples/utils/declare_types.rs b/examples/utils/declare_types.rs index 74d8b800b..c09670e8e 100644 --- a/examples/utils/declare_types.rs +++ b/examples/utils/declare_types.rs @@ -6,7 +6,7 @@ pub type Raft = openraft::Raft; pub type Vote = ::Vote; pub type LeaderId = ::LeaderId; pub type LogId = openraft::LogId; -pub type Entry = openraft::Entry; +pub type Entry = ::Entry; pub type EntryPayload = openraft::EntryPayload; pub type StoredMembership = openraft::StoredMembership; diff --git a/openraft/src/base/finalized.rs b/openraft/src/base/finalized.rs new file mode 100644 index 000000000..23f0a31b6 --- /dev/null +++ b/openraft/src/base/finalized.rs @@ -0,0 +1,38 @@ +//! Provides a marker trait to prevent external implementation of trait methods. + +/// A marker trait used to prevent specific already auto-implemented trait methods from being +/// re-implemented outside their defining crate. +/// +/// This is achieved by adding this non-referencable marker trait to a trait method signature. +/// +/// # Example +/// +/// The following code demonstrates how `Final` prevents external implementation: +/// +/// ```ignore +/// pub trait Trait { +/// // This method cannot be implemented by users because it requires +/// // the private `Final` trait in its bounds +/// fn unimplementable(&self) where Self: Final { +/// self.user_impl_this(); +/// } +/// +/// // This method can be implemented by users +/// fn user_impl_this(&self); +/// } +/// +/// pub struct MyType; +/// +/// impl Trait for MyType { +/// // Attempting to implement this method will fail to compile +/// // because `Final` is not accessible from outside the crate +/// fn unimplementable(&self) where Self: Final {} +/// +/// fn user_impl_this(&self) { +/// println!("This implementation is allowed"); +/// } +/// } +/// ``` +pub trait Final {} + +impl Final for T {} diff --git a/openraft/src/base/mod.rs b/openraft/src/base/mod.rs index 43d621ab8..4dc2f73d8 100644 --- a/openraft/src/base/mod.rs +++ b/openraft/src/base/mod.rs @@ -1,5 +1,7 @@ //! Basic types used in the Raft implementation. +pub(crate) mod finalized; + pub use serde_able::OptionalSerde; pub use threaded::BoxAny; pub use threaded::BoxAsyncOnceMut; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c0528f0b0..3202d897e 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1,4 +1,3 @@ -use std::borrow::Borrow; use std::collections::BTreeMap; use std::fmt; use std::fmt::Debug; @@ -43,7 +42,6 @@ use crate::engine::Condition; use crate::engine::Engine; use crate::engine::ReplicationProgress; use crate::engine::Respond; -use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::error::AllowNextRevertError; use crate::error::ClientWriteError; @@ -54,8 +52,7 @@ use crate::error::InitializeError; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; -use crate::log_id::LogIdOptionExt; -use crate::log_id::RaftLogId; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; use crate::metrics::HeartbeatMetrics; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; @@ -542,17 +539,8 @@ where pub fn flush_metrics(&mut self) { let (replication, heartbeat) = if let Some(leader) = self.engine.leader.as_ref() { let replication_prog = &leader.progress; - let replication = Some( - replication_prog - .iter() - .map(|(id, p)| { - ( - id.clone(), - as Borrow>>>::borrow(p).clone(), - ) - }) - .collect(), - ); + let replication = + Some(replication_prog.iter().map(|(id, p)| (id.clone(), p.matching().cloned())).collect()); let clock_prog = &leader.clock_progress; let heartbeat = @@ -842,7 +830,7 @@ where session_id, self.config.clone(), self.engine.state.committed().cloned(), - progress_entry.matching().cloned(), + progress_entry.matching.clone(), network, snapshot_network, self.log_store.get_log_reader().await, @@ -1246,7 +1234,7 @@ where self.handle_check_is_leader_request(tx).await; } RaftMsg::ClientWriteRequest { app_data, tx } => { - self.write_entry(C::Entry::from_app_data(app_data), Some(tx)); + self.write_entry(C::Entry::new_normal(LogIdOf::::default(), app_data), Some(tx)); } RaftMsg::Initialize { members, tx } => { tracing::info!( @@ -1746,10 +1734,10 @@ where committed_vote: vote, entries, } => { - let last_log_id = entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - let io_id = IOId::new_log_io(vote, Some(last_log_id.clone())); + let io_id = IOId::new_log_io(vote, Some(last_log_id)); let notify = Notification::LocalIO { io_id: io_id.clone() }; let callback = IOFlushed::new(notify, self.tx_notification.downgrade()); diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index c332908c2..eafb8992c 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -15,6 +15,7 @@ use crate::core::ApplyResult; use crate::core::ApplyingEntry; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplaySliceExt; +use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::storage::RaftStateMachine; use crate::storage::Snapshot; @@ -23,7 +24,6 @@ use crate::type_config::alias::LogIdOf; use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::TypeConfigExt; -use crate::RaftLogId; use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; @@ -184,10 +184,8 @@ where // Fake complain: avoid using `collect()` when not needed #[allow(clippy::needless_collect)] - let applying_entries = entries - .iter() - .map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership())) - .collect::>(); + let applying_entries = + entries.iter().map(|e| ApplyingEntry::new(e.log_id(), e.get_membership())).collect::>(); let n_entries = end - since; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 8f7a79db1..465bb188d 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -56,7 +56,6 @@ use crate::vote::RaftTerm; use crate::vote::RaftVote; use crate::LogIdOptionExt; use crate::Membership; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Raft protocol algorithm. @@ -191,7 +190,7 @@ where C: RaftTypeConfig self.check_initialize()?; // The very first log id - entry.set_log_id(&LogIdOf::::default()); + entry.set_log_id(LogIdOf::::default()); let m = entry.get_membership().expect("the only log entry for initializing has to be membership log"); self.check_members_contain_me(&m)?; diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 201a7372a..7a8ac6fc1 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -11,16 +11,17 @@ use crate::engine::Command; use crate::engine::Condition; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::error::RejectAppendEntries; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::storage::Snapshot; use crate::type_config::alias::LogIdOf; use crate::vote::committed::CommittedVote; use crate::EffectiveMembership; -use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; use crate::StoredMembership; @@ -69,10 +70,12 @@ where C: RaftTypeConfig ); if let Some(x) = entries.first() { - debug_assert!(x.get_log_id().index() == prev_log_id.next_index()); + debug_assert!(x.index() == prev_log_id.next_index()); } - let last_log_id = entries.last().map(|x| x.get_log_id().clone()); + let last_log_id = entries.last().map(|x| x.log_id()); + // let last_log_id = std::cmp::max_by_key(prev_log_id, last_log_id, |x: &Option>| + // x.ord_by()); let last_log_id = std::cmp::max(prev_log_id, last_log_id); let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone())); @@ -84,7 +87,7 @@ where C: RaftTypeConfig // the entries after it has to be deleted first. // Raft requires log ids are in total order by (term,index). // Otherwise the log id with max index makes committed entry invisible in election. - self.truncate_logs(entries[since].get_log_id().index()); + self.truncate_logs(entries[since].index()); let entries = entries.split_off(since); self.do_append_entries(entries); @@ -143,13 +146,10 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self, entries))] pub(crate) fn do_append_entries(&mut self, entries: Vec) { debug_assert!(!entries.is_empty()); - debug_assert_eq!( - entries[0].get_log_id().index(), - self.state.log_ids.last().cloned().next_index(), - ); - debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last()); + debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),); + debug_assert!(Some(entries[0].ref_log_id()) > self.state.log_ids.last().to_ref()); - self.state.extend_log_ids(&entries); + self.state.extend_log_ids(entries.iter().map(|x| x.ref_log_id())); self.append_membership(entries.iter()); self.output.push_command(Command::AppendInputEntries { @@ -343,7 +343,7 @@ where C: RaftTypeConfig // Find the last 2 membership config entries: the committed and the effective. for ent in entries.rev() { if let Some(m) = ent.get_membership() { - memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m)); + memberships.insert(0, StoredMembership::new(Some(ent.log_id()), m)); if memberships.len() == 2 { break; } diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index f58524e86..e069bde65 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -2,6 +2,8 @@ use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; @@ -10,7 +12,6 @@ use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::replication::ReplicationSessionId; use crate::type_config::alias::LogIdOf; -use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; @@ -58,7 +59,7 @@ where C: RaftTypeConfig self.leader.assign_log_ids(&mut entries); - self.state.extend_log_ids_from_same_leader(&entries); + self.state.extend_log_ids_from_same_leader(entries.iter().map(|x| x.ref_log_id())); let mut membership_entry = None; for entry in entries.iter() { @@ -67,7 +68,7 @@ where C: RaftTypeConfig membership_entry.is_none(), "only one membership entry is allowed in a batch" ); - membership_entry = Some((entry.get_log_id().clone(), m)); + membership_entry = Some((entry.log_id(), m)); } } diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index 79db5c05b..69fbb1eb6 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -2,6 +2,7 @@ use crate::display_ext::DisplayOptionExt; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::log_id::option_ref_log_id_ext::OptionRefLogIdExt; use crate::raft_state::LogStateReader; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; @@ -100,7 +101,7 @@ where C: RaftTypeConfig return None; } - let log_id = self.state.log_ids.get(purge_end - 1); + let log_id = self.state.log_ids.ref_at(purge_end - 1); debug_assert!( log_id.is_some(), "log id not found at {}, engine.state:{:?}", @@ -108,6 +109,6 @@ where C: RaftTypeConfig st ); - log_id + log_id.to_log_id() } } diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 9aecfbf59..d03452276 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -53,7 +53,7 @@ where C: RaftTypeConfig pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta) -> bool { tracing::info!("update_snapshot: {:?}", meta); - if meta.last_log_id <= self.state.snapshot_last_log_id().cloned() { + if meta.last_log_id.as_ref() <= self.state.snapshot_last_log_id() { tracing::info!( "No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})", self.state.snapshot_last_log_id().display(), diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 8fcc09fea..376296df6 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use std::time::Duration; use crate::core::raft_msg::ResultSender; +use crate::display_ext::DisplayOptionExt; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::handler::server_state_handler::ServerStateHandler; @@ -164,7 +165,7 @@ where C: RaftTypeConfig "become leader: node-{}, my vote: {}, last-log-id: {}", self.config.id, self.state.vote_ref(), - self.state.last_log_id().cloned().unwrap_or_default() + self.state.last_log_id().display() ); if let Some(l) = self.leader.as_mut() { diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 7f525eb90..8aaabc211 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -1,10 +1,13 @@ use std::ops::RangeInclusive; use crate::engine::leader_log_ids::LeaderLogIds; -use crate::log_id::RaftLogId; +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; use crate::storage::RaftLogReaderExt; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; -use crate::LogIdOptionExt; use crate::RaftLogReader; use crate::RaftTypeConfig; use crate::StorageError; @@ -80,7 +83,7 @@ where C: RaftTypeConfig // Two adjacent logs with different leader_id, no need to binary search if first.index() + 1 == last.index() { - if res.last().map(|x| x.committed_leader_id()) < Some(first.committed_leader_id()) { + if res.last().committed_leader_id() < Some(first.committed_leader_id()) { res.push(first); } res.push(last); @@ -91,7 +94,7 @@ where C: RaftTypeConfig if first.committed_leader_id() == mid.committed_leader_id() { // Case AAC - if res.last().map(|x| x.committed_leader_id()) < Some(first.committed_leader_id()) { + if res.last().committed_leader_id() < Some(first.committed_leader_id()) { res.push(first); } stack.push((mid, last)); @@ -130,43 +133,46 @@ where C: RaftTypeConfig /// Extends a list of `log_id` that are proposed by a same leader. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - if let Some(first) = new_ids.first() { - let first_id = first.get_log_id(); - self.append(first_id.clone()); + pub(crate) fn extend_from_same_leader(&mut self, new_ids: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: DoubleEndedIterator, + { + let mut it = new_ids.into_iter(); + if let Some(first) = it.next() { + self.append(first.to_log_id()); - if let Some(last) = new_ids.last() { - let last_id = last.get_log_id(); - assert_eq!(last_id.committed_leader_id(), first_id.committed_leader_id()); + if let Some(last) = it.next_back() { + debug_assert_eq!(last.committed_leader_id(), first.committed_leader_id()); - if last_id != first_id { - self.append(last_id.clone()); + if last != first { + self.append(last.to_log_id()); } } } } /// Extends a list of `log_id`. - // leader_id: Copy is feature gated - #[allow(clippy::clone_on_copy)] - pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - let mut prev = self.last().map(|x| x.committed_leader_id().clone()); - - for x in new_ids.iter() { - let log_id = x.get_log_id(); - - if prev.as_ref() != Some(log_id.committed_leader_id()) { - self.append(log_id.clone()); + pub(crate) fn extend(&mut self, new_ids: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { + let it = new_ids.into_iter(); + let len = it.len(); - prev = Some(log_id.committed_leader_id().clone()); + for (i, log_id) in it.enumerate() { + if self.last_leader_id() != Some(log_id.committed_leader_id()) { + self.append(log_id.to_log_id()); } - } - if let Some(last) = new_ids.last() { - let log_id = last.get_log_id(); - - if self.last() != Some(log_id) { - self.append(log_id.clone()); + #[allow(clippy::collapsible_if)] + if i == len - 1 { + if self.ref_last() != Some(log_id.to_ref()) { + self.append(log_id.to_log_id()); + } } } } @@ -270,35 +276,45 @@ where C: RaftTypeConfig } Err(i) => { self.key_log_ids = self.key_log_ids.split_off(i - 1); - self.key_log_ids[0].index = upto.index(); + self.key_log_ids[0] = + LogIdOf::::new(self.key_log_ids[0].committed_leader_id().clone(), upto.index()); } } } - /// Get the log id at the specified index. + // This method is only used in tests + #[allow(dead_code)] + pub(crate) fn get(&self, index: u64) -> Option> { + self.ref_at(index).map(|x| x.into_owned()) + } + + /// Get the log id at the specified index in a [`RefLogId`]. /// /// It will return `last_purged_log_id` if index is at the last purged index. - // leader_id: Copy is feature gated #[allow(clippy::clone_on_copy)] - pub(crate) fn get(&self, index: u64) -> Option> { + pub(crate) fn ref_at(&self, index: u64) -> Option> { let res = self.key_log_ids.binary_search_by(|log_id| log_id.index().cmp(&index)); - match res { - Ok(i) => Some(LogIdOf::::new( - self.key_log_ids[i].committed_leader_id().clone(), - index, - )), + // Index of the leadership change point that covers the target index. + // It points to either: + // - The exact matching log entry if found, or + // - The most recent change point before the target index + let change_point = match res { + Ok(i) => i, Err(i) => { + // i - 1 is the last one that is smaller than the input. if i == 0 || i == self.key_log_ids.len() { - None + return None; } else { - Some(LogIdOf::::new( - self.key_log_ids[i - 1].committed_leader_id().clone(), - index, - )) + i - 1 } } - } + }; + + Some(RefLogId::new( + self.key_log_ids[change_point].committed_leader_id(), + index, + )) } pub(crate) fn first(&self) -> Option<&LogIdOf> { @@ -309,6 +325,14 @@ where C: RaftTypeConfig self.key_log_ids.last() } + pub(crate) fn ref_last(&self) -> Option> { + self.last().map(|x| x.to_ref()) + } + + pub(crate) fn last_leader_id(&self) -> Option<&CommittedLeaderIdOf> { + self.last().map(|x| x.committed_leader_id()) + } + // This method will only be used under feature tokio-rt #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] pub(crate) fn key_log_ids(&self) -> &[LogIdOf] { diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index 79e89bb88..0277087b0 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -42,7 +42,7 @@ where N: Node + Ord type Term = u64; type LeaderId = crate::impls::leader_id_adv::LeaderId; type Vote = crate::impls::Vote; - type Entry = crate::Entry; + type Entry = crate::impls::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/engine/tests/log_id_list_test.rs b/openraft/src/engine/tests/log_id_list_test.rs index 13244cac3..ab1aa084c 100644 --- a/openraft/src/engine/tests/log_id_list_test.rs +++ b/openraft/src/engine/tests/log_id_list_test.rs @@ -9,13 +9,13 @@ fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { // Extend one log id to an empty LogIdList: Just store it directly - ids.extend_from_same_leader(&[log_id(1, 1, 2)]); + ids.extend_from_same_leader([log_id(1, 1, 2)]); assert_eq!(vec![log_id(1, 1, 2)], ids.key_log_ids()); // Extend two log ids that are adjacent to the last stored one. // It should append only one log id as the new ending log id. - ids.extend_from_same_leader(&[ + ids.extend_from_same_leader([ log_id(1, 1, 3), // log_id(1, 1, 4), ]); @@ -31,7 +31,7 @@ fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { // Extend 3 log id with new leader id. // It should just store every log id for each leader, plus one last-log-id. - ids.extend_from_same_leader(&[ + ids.extend_from_same_leader([ log_id(2, 1, 5), // log_id(2, 1, 6), log_id(2, 1, 7), @@ -55,13 +55,13 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend one log id to an empty LogIdList: Just store it directly - ids.extend(&[log_id(1, 1, 2)]); + ids.extend([log_id(1, 1, 2)]); assert_eq!(vec![log_id(1, 1, 2)], ids.key_log_ids()); // Extend two log ids that are adjacent to the last stored one. // It should append only one log id as the new ending log id. - ids.extend(&[ + ids.extend([ log_id(1, 1, 3), // log_id(1, 1, 4), ]); @@ -77,7 +77,7 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend 3 log id with different leader id. // Last two has the same leader id. - ids.extend(&[ + ids.extend([ log_id(1, 1, 5), // log_id(2, 1, 6), log_id(2, 1, 7), @@ -95,7 +95,7 @@ fn test_log_id_list_extend() -> anyhow::Result<()> { // Extend 3 log id with different leader id. // Last two have different leader id. - ids.extend(&[ + ids.extend([ log_id(2, 1, 8), // log_id(2, 1, 9), log_id(3, 1, 10), diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index 25284bddd..96310b0f6 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -3,18 +3,18 @@ use std::fmt; use std::fmt::Debug; -use crate::log_id::RaftLogId; use crate::Membership; use crate::RaftTypeConfig; pub mod payload; +pub(crate) mod raft_entry_ext; mod traits; pub use payload::EntryPayload; -pub use traits::FromAppData; pub use traits::RaftEntry; pub use traits::RaftPayload; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; /// A Raft log entry. @@ -89,52 +89,23 @@ where C: RaftTypeConfig impl RaftPayload for Entry where C: RaftTypeConfig { - fn is_blank(&self) -> bool { - self.payload.is_blank() - } - fn get_membership(&self) -> Option> { self.payload.get_membership() } } -impl RaftLogId for Entry -where C: RaftTypeConfig -{ - fn get_log_id(&self) -> &LogIdOf { - &self.log_id - } - - fn set_log_id(&mut self, log_id: &LogIdOf) { - self.log_id = log_id.clone(); - } -} - impl RaftEntry for Entry where C: RaftTypeConfig { - fn new_blank(log_id: LogIdOf) -> Self { - Self { - log_id, - payload: EntryPayload::Blank, - } + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self { + Self { log_id, payload } } - fn new_membership(log_id: LogIdOf, m: Membership) -> Self { - Self { - log_id, - payload: EntryPayload::Membership(m), - } + fn log_id_parts(&self) -> (&CommittedLeaderIdOf, u64) { + (&self.log_id.leader_id, self.log_id.index) } -} -impl FromAppData for Entry -where C: RaftTypeConfig -{ - fn from_app_data(d: C::D) -> Self { - Entry { - log_id: LogIdOf::::default(), - payload: EntryPayload::Normal(d), - } + fn set_log_id(&mut self, new: LogIdOf) { + self.log_id = new; } } diff --git a/openraft/src/entry/payload.rs b/openraft/src/entry/payload.rs index 133a5cb21..76a6afefc 100644 --- a/openraft/src/entry/payload.rs +++ b/openraft/src/entry/payload.rs @@ -63,10 +63,6 @@ where C: RaftTypeConfig } impl RaftPayload for EntryPayload { - fn is_blank(&self) -> bool { - matches!(self, EntryPayload::Blank) - } - fn get_membership(&self) -> Option> { if let EntryPayload::Membership(m) = self { Some(m.clone()) diff --git a/openraft/src/entry/raft_entry_ext.rs b/openraft/src/entry/raft_entry_ext.rs new file mode 100644 index 000000000..5d14acd83 --- /dev/null +++ b/openraft/src/entry/raft_entry_ext.rs @@ -0,0 +1,20 @@ +use crate::entry::RaftEntry; +use crate::log_id::ref_log_id::RefLogId; +use crate::RaftTypeConfig; + +pub(crate) trait RaftEntryExt: RaftEntry +where C: RaftTypeConfig +{ + /// Returns a lightweight [`RefLogId`] that contains the log id information. + fn ref_log_id(&self) -> RefLogId<'_, C> { + let (leader_id, index) = self.log_id_parts(); + RefLogId::new(leader_id, index) + } +} + +impl RaftEntryExt for T +where + C: RaftTypeConfig, + T: RaftEntry, +{ +} diff --git a/openraft/src/entry/traits.rs b/openraft/src/entry/traits.rs index f7620a1a0..3077d127d 100644 --- a/openraft/src/entry/traits.rs +++ b/openraft/src/entry/traits.rs @@ -1,9 +1,13 @@ use std::fmt::Debug; use std::fmt::Display; +use openraft_macros::since; + +use crate::base::finalized::Final; use crate::base::OptionalFeatures; -use crate::log_id::RaftLogId; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; +use crate::EntryPayload; use crate::Membership; use crate::RaftTypeConfig; @@ -11,10 +15,7 @@ use crate::RaftTypeConfig; pub trait RaftPayload where C: RaftTypeConfig { - /// Return `true` if the entry payload is blank. - fn is_blank(&self) -> bool; - - /// Return `Some(Membership)` if the entry payload is a membership payload. + /// Return `Some(Membership)` if the entry payload contains a membership payload. fn get_membership(&self) -> Option>; } @@ -23,23 +24,64 @@ pub trait RaftEntry where C: RaftTypeConfig, Self: OptionalFeatures + Debug + Display, - Self: RaftPayload + RaftLogId, + Self: RaftPayload, { - /// Create a new blank log entry. + /// Create a new log entry with log id and payload of application data or membership config. + #[since(version = "0.10.0")] + fn new(log_id: LogIdOf, payload: EntryPayload) -> Self; + + /// Returns references to the components of this entry's log ID: the committed leader ID and + /// index. + /// + /// The returned tuple contains: + /// - A reference to the committed leader ID that proposed this log entry. + /// - The index position of this entry in the log. /// - /// The returned instance must return `true` for `Self::is_blank()`. - fn new_blank(log_id: LogIdOf) -> Self; + /// Note: Although these components constitute a `LogId`, this method returns them separately + /// rather than as a reference to `LogId`. This allows implementations to store these + /// components directly without requiring a `LogId` field in their data structure. + #[since(version = "0.10.0")] + fn log_id_parts(&self) -> (&CommittedLeaderIdOf, u64); + + /// Set the log ID of this entry. + #[since(version = "0.10.0")] + fn set_log_id(&mut self, new: LogIdOf); + + /// Create a new blank log entry. + #[since(version = "0.10.0", change = "become a default method")] + fn new_blank(log_id: LogIdOf) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Blank) + } + + /// Create a new normal log entry that contains application data. + #[since(version = "0.10.0", change = "become a default method")] + fn new_normal(log_id: LogIdOf, data: C::D) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Normal(data)) + } /// Create a new membership log entry. /// /// The returned instance must return `Some()` for `Self::get_membership()`. - fn new_membership(log_id: LogIdOf, m: Membership) -> Self; -} + #[since(version = "0.10.0", change = "become a default method")] + fn new_membership(log_id: LogIdOf, m: Membership) -> Self + where Self: Final + Sized { + Self::new(log_id, EntryPayload::Membership(m)) + } + + /// Returns the `LogId` of this entry. + #[since(version = "0.10.0")] + fn log_id(&self) -> LogIdOf + where Self: Final { + let (leader_id, index) = self.log_id_parts(); + LogIdOf::::new(leader_id.clone(), index) + } -/// Build a raft log entry from app data. -/// -/// A concrete Entry should implement this trait to let openraft create an entry when needed. -pub trait FromAppData { - /// Build a raft log entry from app data. - fn from_app_data(t: T) -> Self; + /// Returns the index of this log entry. + #[since(version = "0.10.0")] + fn index(&self) -> u64 + where Self: Final { + self.log_id_parts().1 + } } diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index 514640898..72e01d50f 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -17,6 +17,8 @@ pub mod leader_id_std { pub use crate::vote::leader_id::leader_id_std::LeaderId; } +/// Default implementation of a raft log identity. +pub use crate::log_id::LogId; /// Default [`RaftVote`] implementation for both standard Raft mode and multi-leader-per-term mode. /// /// The difference between the two modes is the implementation of [`RaftLeaderId`]. diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index d07ebd644..459da2a6c 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -105,7 +105,6 @@ pub use crate::instant::TokioInstant; pub use crate::log_id::LogId; pub use crate::log_id::LogIdOptionExt; pub use crate::log_id::LogIndexOptionExt; -pub use crate::log_id::RaftLogId; pub use crate::membership::EffectiveMembership; pub use crate::membership::Membership; pub use crate::membership::StoredMembership; diff --git a/openraft/src/log_id/log_id_option_ext.rs b/openraft/src/log_id/log_id_option_ext.rs index efe5a3b55..678e55599 100644 --- a/openraft/src/log_id/log_id_option_ext.rs +++ b/openraft/src/log_id/log_id_option_ext.rs @@ -1,8 +1,10 @@ -use crate::type_config::alias::LogIdOf; +use crate::log_id::raft_log_id::RaftLogId; use crate::RaftTypeConfig; /// This helper trait extracts information from an `Option`. -pub trait LogIdOptionExt { +pub trait LogIdOptionExt +where C: RaftTypeConfig +{ /// Returns the log index if it is not a `None`. fn index(&self) -> Option; @@ -12,8 +14,10 @@ pub trait LogIdOptionExt { fn next_index(&self) -> u64; } -impl LogIdOptionExt for Option> -where C: RaftTypeConfig +impl LogIdOptionExt for Option +where + C: RaftTypeConfig, + T: RaftLogId, { fn index(&self) -> Option { self.as_ref().map(|x| x.index()) @@ -26,18 +30,3 @@ where C: RaftTypeConfig } } } - -impl LogIdOptionExt for Option<&LogIdOf> -where C: RaftTypeConfig -{ - fn index(&self) -> Option { - self.map(|x| x.index()) - } - - fn next_index(&self) -> u64 { - match self { - None => 0, - Some(log_id) => log_id.index() + 1, - } - } -} diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index d5b30c9dc..4b32108e1 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -3,15 +3,19 @@ mod log_id_option_ext; mod log_index_option_ext; -mod raft_log_id; +pub(crate) mod option_raft_log_id_ext; +pub(crate) mod option_ref_log_id_ext; +pub(crate) mod raft_log_id; +pub(crate) mod raft_log_id_ext; +pub(crate) mod ref_log_id; use std::fmt::Display; use std::fmt::Formatter; pub use log_id_option_ext::LogIdOptionExt; pub use log_index_option_ext::LogIndexOptionExt; -pub use raft_log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; use crate::type_config::alias::CommittedLeaderIdOf; use crate::RaftTypeConfig; @@ -42,12 +46,16 @@ where impl RaftLogId for LogId where C: RaftTypeConfig { - fn get_log_id(&self) -> &LogId { - self + fn new(leader_id: CommittedLeaderIdOf, index: u64) -> Self { + LogId { leader_id, index } } - fn set_log_id(&mut self, log_id: &LogId) { - *self = log_id.clone() + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + &self.leader_id + } + + fn index(&self) -> u64 { + self.index } } diff --git a/openraft/src/log_id/option_raft_log_id_ext.rs b/openraft/src/log_id/option_raft_log_id_ext.rs new file mode 100644 index 000000000..93711b6f7 --- /dev/null +++ b/openraft/src/log_id/option_raft_log_id_ext.rs @@ -0,0 +1,51 @@ +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::CommittedLeaderIdOf; +use crate::RaftTypeConfig; + +/// This helper trait extracts information from an `Option` where T impls [`RaftLogId`]. +pub(crate) trait OptionRaftLogIdExt +where C: RaftTypeConfig +{ + /// Returns the log index if it is not a `None`. + fn index(&self) -> Option; + + /// Returns the next log index. + /// + /// If self is `None`, it returns 0. + fn next_index(&self) -> u64; + + /// Returns the leader id that proposed this log id. + /// + /// In standard raft, committed leader id is just `term`. + fn committed_leader_id(&self) -> Option<&CommittedLeaderIdOf>; + + /// Converts `&Option` to `Option`. + fn to_ref(&self) -> Option>; +} + +impl OptionRaftLogIdExt for Option +where + C: RaftTypeConfig, + T: RaftLogId, +{ + fn index(&self) -> Option { + self.as_ref().map(|x| x.index()) + } + + fn next_index(&self) -> u64 { + match self { + None => 0, + Some(log_id) => log_id.index() + 1, + } + } + + fn committed_leader_id(&self) -> Option<&CommittedLeaderIdOf> { + self.as_ref().map(|x| x.committed_leader_id()) + } + + fn to_ref(&self) -> Option> { + self.as_ref().map(|x| x.to_ref()) + } +} diff --git a/openraft/src/log_id/option_ref_log_id_ext.rs b/openraft/src/log_id/option_ref_log_id_ext.rs new file mode 100644 index 000000000..d0d113600 --- /dev/null +++ b/openraft/src/log_id/option_ref_log_id_ext.rs @@ -0,0 +1,20 @@ +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::LogIdOf; +use crate::RaftTypeConfig; + +pub(crate) trait OptionRefLogIdExt +where C: RaftTypeConfig +{ + /// Creates a new owned [`LogId`] from the reference log ID. + /// + /// [`LogId`]: crate::log_id::LogId + fn to_log_id(&self) -> Option>; +} + +impl OptionRefLogIdExt for Option> +where C: RaftTypeConfig +{ + fn to_log_id(&self) -> Option> { + self.as_ref().map(|r| r.into_owned()) + } +} diff --git a/openraft/src/log_id/raft_log_id.rs b/openraft/src/log_id/raft_log_id.rs index 06feb8f8c..f39d7d6a4 100644 --- a/openraft/src/log_id/raft_log_id.rs +++ b/openraft/src/log_id/raft_log_id.rs @@ -1,24 +1,47 @@ +use std::fmt; + use crate::type_config::alias::CommittedLeaderIdOf; -use crate::type_config::alias::LogIdOf; use crate::RaftTypeConfig; -/// Defines API to operate an object that contains a log-id, such as a log entry or a log id. -pub trait RaftLogId -where C: RaftTypeConfig +/// Log id is the globally unique identifier of a log entry. +/// +/// Equal log id means the same log entry. +pub(crate) trait RaftLogId +where + C: RaftTypeConfig, + Self: Eq + Clone + fmt::Debug + fmt::Display, { + /// Creates a log id proposed by a committed leader `leader_id` at the given index. + // This is only used internally + #[allow(dead_code)] + fn new(leader_id: CommittedLeaderIdOf, index: u64) -> Self; + /// Returns a reference to the leader id that proposed this log id. /// /// When a `LeaderId` is committed, some of its data can be discarded. /// For example, a leader id in standard raft is `(term, node_id)`, but a log id does not have /// to store the `node_id`, because in standard raft there is at most one leader that can be /// established. - fn leader_id(&self) -> &CommittedLeaderIdOf { - self.get_log_id().committed_leader_id() + fn committed_leader_id(&self) -> &CommittedLeaderIdOf; + + /// Returns the index of the log id. + fn index(&self) -> u64; +} + +impl RaftLogId for &T +where + C: RaftTypeConfig, + T: RaftLogId, +{ + fn new(_leader_id: CommittedLeaderIdOf, _index: u64) -> Self { + unreachable!("This method should not be called on a reference.") } - /// Return a reference to the log-id it stores. - fn get_log_id(&self) -> &LogIdOf; + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + T::committed_leader_id(self) + } - /// Update the log id it contains. - fn set_log_id(&mut self, log_id: &LogIdOf); + fn index(&self) -> u64 { + T::index(self) + } } diff --git a/openraft/src/log_id/raft_log_id_ext.rs b/openraft/src/log_id/raft_log_id_ext.rs new file mode 100644 index 000000000..789318fde --- /dev/null +++ b/openraft/src/log_id/raft_log_id_ext.rs @@ -0,0 +1,32 @@ +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::ref_log_id::RefLogId; +use crate::type_config::alias::LogIdOf; +use crate::RaftTypeConfig; + +pub(crate) trait RaftLogIdExt +where + C: RaftTypeConfig, + Self: RaftLogId, +{ + /// Creates a new owned [`LogId`] from this log ID implementation. + /// + /// [`LogId`]: crate::log_id::LogId + fn to_log_id(&self) -> LogIdOf { + self.to_ref().into_owned() + } + + /// Creates a reference view of this log ID implementation via a [`RefLogId`]. + fn to_ref(&self) -> RefLogId<'_, C> { + RefLogId { + leader_id: self.committed_leader_id(), + index: self.index(), + } + } +} + +impl RaftLogIdExt for T +where + C: RaftTypeConfig, + T: RaftLogId, +{ +} diff --git a/openraft/src/log_id/ref_log_id.rs b/openraft/src/log_id/ref_log_id.rs new file mode 100644 index 000000000..b5a969139 --- /dev/null +++ b/openraft/src/log_id/ref_log_id.rs @@ -0,0 +1,60 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use crate::log_id::raft_log_id::RaftLogId; +use crate::type_config::alias::CommittedLeaderIdOf; +use crate::LogId; +use crate::RaftTypeConfig; + +/// A reference to a log id, combining a reference to a committed leader ID and an index. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct RefLogId<'k, C> +where C: RaftTypeConfig +{ + pub(crate) leader_id: &'k CommittedLeaderIdOf, + pub(crate) index: u64, +} + +impl<'l, C> RefLogId<'l, C> +where C: RaftTypeConfig +{ + pub(crate) fn new(leader_id: &'l CommittedLeaderIdOf, index: u64) -> Self { + RefLogId { leader_id, index } + } + + pub(crate) fn committed_leader_id(&self) -> &'l CommittedLeaderIdOf { + self.leader_id + } + + pub(crate) fn index(&self) -> u64 { + self.index + } + + pub(crate) fn into_owned(self) -> LogId { + LogId::::new(self.leader_id.clone(), self.index) + } +} + +impl Display for RefLogId<'_, C> +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.committed_leader_id(), self.index()) + } +} + +impl RaftLogId for RefLogId<'_, C> +where C: RaftTypeConfig +{ + fn new(_leader_id: CommittedLeaderIdOf, _index: u64) -> Self { + unreachable!("RefLogId does not own the leader id, so it cannot be created from it.") + } + + fn committed_leader_id(&self) -> &CommittedLeaderIdOf { + self.leader_id + } + + fn index(&self) -> u64 { + self.index + } +} diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index acce6d89b..51240501f 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -5,7 +5,6 @@ use std::fmt::Formatter; use validit::Validate; use crate::display_ext::DisplayOptionExt; -use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -30,7 +29,7 @@ where C: RaftTypeConfig impl Copy for LogIdRange where C: RaftTypeConfig, - CommittedLeaderIdOf: Copy, + LogIdOf: Copy, { } @@ -70,11 +69,10 @@ mod tests { use crate::engine::testing::UTConfig; use crate::log_id_range::LogIdRange; - use crate::testing; use crate::type_config::alias::LogIdOf; fn log_id(index: u64) -> LogIdOf { - testing::log_id(1, 1, index) + crate::engine::testing::log_id(1, 1, index) } #[test] diff --git a/openraft/src/membership/effective_membership.rs b/openraft/src/membership/effective_membership.rs index 66e00d66a..8fb5994d0 100644 --- a/openraft/src/membership/effective_membership.rs +++ b/openraft/src/membership/effective_membership.rs @@ -4,7 +4,8 @@ use std::fmt::Debug; use std::sync::Arc; use crate::display_ext::DisplayOptionExt; -use crate::log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; use crate::quorum::Joint; use crate::quorum::QuorumSet; use crate::type_config::alias::LogIdOf; @@ -58,7 +59,7 @@ where LID: RaftLogId, { fn from(v: (&LID, Membership)) -> Self { - EffectiveMembership::new(Some(v.0.get_log_id().clone()), v.1) + EffectiveMembership::new(Some(v.0.to_log_id()), v.1) } } diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index 6cc81903c..8cc1eff2e 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -187,6 +187,14 @@ where C: RaftTypeConfig } } +// impl Borrow> for ProgressEntry +// where C: RaftTypeConfig +// { +// fn borrow(&self) -> &OrdLogIdOf { +// &self.matching +// } +// } + impl Display for ProgressEntry where C: RaftTypeConfig { @@ -214,12 +222,12 @@ where C: RaftTypeConfig Inflight::Logs { log_id_range, .. } => { // matching <= prev_log_id <= last_log_id // prev_log_id.next_index() <= searching_end - validit::less_equal!(&self.matching, &log_id_range.prev); + validit::less_equal!(self.matching(), log_id_range.prev.as_ref()); validit::less_equal!(log_id_range.prev.next_index(), self.searching_end); } Inflight::Snapshot { last_log_id, .. } => { // There is no need to send a snapshot smaller than last matching. - validit::less!(&self.matching, last_log_id); + validit::less!(self.matching(), last_log_id.as_ref()); } } Ok(()) diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index d85107a5a..845428dc1 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -2,6 +2,7 @@ use std::borrow::Borrow; use crate::engine::testing::UTConfig; use crate::engine::EngineConfig; +use crate::log_id::ref_log_id::RefLogId; use crate::progress::entry::ProgressEntry; use crate::progress::inflight::Inflight; use crate::raft_state::LogStateReader; @@ -118,6 +119,10 @@ impl LogStateReader for LogState { } } + fn ref_log_id(&self, _index: u64) -> Option> { + todo!() + } + fn last_log_id(&self) -> Option<&LogIdOf> { self.last.as_ref() } diff --git a/openraft/src/progress/entry/update.rs b/openraft/src/progress/entry/update.rs index c9b71c8f3..80f3c76c6 100644 --- a/openraft/src/progress/entry/update.rs +++ b/openraft/src/progress/entry/update.rs @@ -85,7 +85,7 @@ where C: RaftTypeConfig self.entry.inflight.ack(matching.clone()); - debug_assert!(matching >= self.entry.matching); + debug_assert!(matching.as_ref() >= self.entry.matching()); self.entry.matching = matching; let matching_next = self.entry.matching().next_index(); diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 363137581..dbfc114d3 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -9,7 +9,6 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::log_id_range::LogIdRange; -use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::LogIdOf; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -42,7 +41,7 @@ where C: RaftTypeConfig impl Copy for Inflight where C: RaftTypeConfig, - CommittedLeaderIdOf: Copy, + LogIdOf: Copy, { } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 1c43aa304..cd88e4790 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -2,6 +2,8 @@ use std::fmt; use crate::display_ext::DisplayInstantExt; use crate::engine::leader_log_ids::LeaderLogIds; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; use crate::progress::entry::ProgressEntry; use crate::progress::Progress; use crate::progress::VecProgress; @@ -12,7 +14,6 @@ use crate::type_config::TypeConfigExt; use crate::vote::committed::CommittedVote; use crate::vote::raft_vote::RaftVoteExt; use crate::LogIdOptionExt; -use crate::RaftLogId; use crate::RaftTypeConfig; /// Leading state data. @@ -162,27 +163,32 @@ where /// Assign log ids to the entries. /// /// This method update the `self.last_log_id`. - pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( - &mut self, - entries: impl IntoIterator, - ) { + pub(crate) fn assign_log_ids<'a, Ent, I>(&mut self, entries: I) + where + Ent: RaftEntry + 'a, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { debug_assert!(self.transfer_to.is_none(), "leader is disabled to propose new log"); + let it = entries.into_iter(); + let len = it.len(); + if len == 0 { + return; + } + let committed_leader_id = self.committed_vote.committed_leader_id(); - let first = LogIdOf::::new(committed_leader_id, self.last_log_id().next_index()); - let mut last = first.clone(); + let mut index = self.last_log_id().next_index(); - for entry in entries { - entry.set_log_id(&last); - tracing::debug!("assign log id: {}", last); - last.index += 1; + for entry in it { + entry.set_log_id(LogIdOf::::new(committed_leader_id.clone(), index)); + tracing::debug!("assign log id: {}", entry.ref_log_id()); + index += 1; } - if last.index() > first.index() { - last.index -= 1; - self.last_log_id = Some(last); - } + index -= 1; + self.last_log_id = Some(LogIdOf::::new(committed_leader_id.clone(), index)); } /// Get the last timestamp acknowledged by a quorum. @@ -234,7 +240,6 @@ mod tests { use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; use crate::Entry; - use crate::RaftLogId; use crate::Vote; #[test] @@ -297,8 +302,8 @@ mod tests { leader.assign_log_ids(&mut entries); assert_eq!( - entries[0].get_log_id(), - &log_id(2, 2, 4), + entries[0].log_id(), + log_id(2, 2, 4), "entry log id assigned following last-log-id" ); assert_eq!(Some(log_id(2, 2, 4)), leader.last_log_id); @@ -312,7 +317,7 @@ mod tests { let mut entries: Vec> = vec![blank_ent(1, 1, 1)]; leading.assign_log_ids(&mut entries); - assert_eq!(entries[0].get_log_id(), &log_id(0, 0, 0),); + assert_eq!(entries[0].log_id(), log_id(0, 0, 0),); assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id); } @@ -336,9 +341,9 @@ mod tests { let mut entries: Vec> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)]; leading.assign_log_ids(&mut entries); - assert_eq!(entries[0].get_log_id(), &log_id(2, 2, 9)); - assert_eq!(entries[1].get_log_id(), &log_id(2, 2, 10)); - assert_eq!(entries[2].get_log_id(), &log_id(2, 2, 11)); + assert_eq!(entries[0].log_id(), log_id(2, 2, 9)); + assert_eq!(entries[1].log_id(), log_id(2, 2, 10)); + assert_eq!(entries[2].log_id(), log_id(2, 2, 11)); assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id); } diff --git a/openraft/src/raft_state/io_state/log_io_id.rs b/openraft/src/raft_state/io_state/log_io_id.rs index 2a801857f..a347c60f1 100644 --- a/openraft/src/raft_state/io_state/log_io_id.rs +++ b/openraft/src/raft_state/io_state/log_io_id.rs @@ -20,6 +20,8 @@ use crate::RaftTypeConfig; /// /// See: [LogId Appended Multiple /// Times](crate::docs::protocol::replication::log_replication#logid-appended-multiple-times). +/// +/// [`LogId`]: crate::log_id::LogId #[derive(Debug, Clone)] #[derive(PartialEq, Eq)] #[derive(PartialOrd, Ord)] diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index cb6969f25..1fc04b52b 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -1,5 +1,9 @@ +use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; +use crate::log_id::option_ref_log_id_ext::OptionRefLogIdExt; +use crate::log_id::raft_log_id::RaftLogId; +use crate::log_id::raft_log_id_ext::RaftLogIdExt; +use crate::log_id::ref_log_id::RefLogId; use crate::type_config::alias::LogIdOf; -use crate::LogIdOptionExt; use crate::RaftTypeConfig; /// APIs to get significant log ids reflecting the raft state. @@ -20,26 +24,30 @@ where C: RaftTypeConfig /// Return if a log id exists. /// /// It assumes a committed log will always get positive return value, according to raft spec. - fn has_log_id(&self, log_id: &LogIdOf) -> bool { + fn has_log_id(&self, log_id: impl RaftLogId) -> bool { if log_id.index() < self.committed().next_index() { - debug_assert!(Some(log_id) <= self.committed()); + debug_assert!(Some(log_id.to_ref()) <= self.committed().to_ref()); return true; } // The local log id exists at the index and is same as the input. - if let Some(local) = self.get_log_id(log_id.index()) { - *log_id == local + if let Some(local) = self.ref_log_id(log_id.index()) { + log_id.to_ref() == local } else { false } } + fn get_log_id(&self, index: u64) -> Option> { + self.ref_log_id(index).to_log_id() + } + /// Get the log id at the specified index. /// /// It will return `last_purged_log_id` if index is at the last purged index. /// If the log at the specified index is smaller than `last_purged_log_id`, or greater than /// `last_log_id`, it returns None. - fn get_log_id(&self, index: u64) -> Option>; + fn ref_log_id(&self, index: u64) -> Option>; /// The last known log id in the store. /// diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 6236ed8f3..3984674d5 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -6,7 +6,7 @@ use validit::Validate; use crate::engine::LogIdList; use crate::error::ForwardToLeader; -use crate::log_id::RaftLogId; +use crate::log_id::raft_log_id::RaftLogId; use crate::storage::SnapshotMeta; use crate::utime::Leased; use crate::LogIdOptionExt; @@ -35,6 +35,9 @@ pub use membership_state::MembershipState; pub(crate) use vote_state_reader::VoteStateReader; use crate::display_ext::DisplayOptionExt; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; +use crate::log_id::ref_log_id::RefLogId; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::type_config::alias::InstantOf; @@ -108,8 +111,8 @@ where C: RaftTypeConfig impl LogStateReader for RaftState where C: RaftTypeConfig { - fn get_log_id(&self, index: u64) -> Option> { - self.log_ids.get(index) + fn ref_log_id(&self, index: u64) -> Option> { + self.log_ids.ref_at(index) } fn last_log_id(&self) -> Option<&LogIdOf> { @@ -254,11 +257,20 @@ where C: RaftTypeConfig /// Append a list of `log_id`. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_ids: &[LID]) { + pub(crate) fn extend_log_ids_from_same_leader<'a, I>(&mut self, new_log_ids: I) + where + I: IntoIterator>, + ::IntoIter: DoubleEndedIterator, + { self.log_ids.extend_from_same_leader(new_log_ids) } - pub(crate) fn extend_log_ids<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { + pub(crate) fn extend_log_ids(&mut self, new_log_id: I) + where + LID: RaftLogId, + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { self.log_ids.extend(new_log_id) } @@ -299,16 +311,16 @@ where C: RaftTypeConfig /// Find the first entry in the input that does not exist on local raft-log, /// by comparing the log id. pub(crate) fn first_conflicting_index(&self, entries: &[Ent]) -> usize - where Ent: RaftLogId { + where Ent: RaftEntry { let l = entries.len(); for (i, ent) in entries.iter().enumerate() { - let log_id = ent.get_log_id(); + let ref_log_id = ent.ref_log_id(); - if !self.has_log_id(log_id) { + if !self.has_log_id(ref_log_id) { tracing::debug!( at = display(i), - entry_log_id = display(log_id), + entry_log_id = display(ref_log_id), "found nonexistent log id" ); return i; diff --git a/openraft/src/raft_state/tests/forward_to_leader_test.rs b/openraft/src/raft_state/tests/forward_to_leader_test.rs index cf5d76b47..0a3b5acf9 100644 --- a/openraft/src/raft_state/tests/forward_to_leader_test.rs +++ b/openraft/src/raft_state/tests/forward_to_leader_test.rs @@ -6,22 +6,29 @@ use maplit::btreeset; use crate::engine::testing::UTConfig; use crate::error::ForwardToLeader; -use crate::testing::log_id; +use crate::type_config::alias::LeaderIdOf; +use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::NodeIdOf; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::RaftLeaderIdExt; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; use crate::RaftState; use crate::Vote; -fn m12() -> Membership { +fn log_id(term: u64, node_id: NodeIdOf>, index: u64) -> LogIdOf> { + LogIdOf::>::new(LeaderIdOf::>::new_committed(term, node_id), index) +} + +fn m12() -> Membership> { Membership::new_with_defaults(vec![btreeset! {1,2}], []) } #[test] fn test_forward_to_leader_vote_not_committed() { - let rs = RaftState:: { + let rs = RaftState::> { vote: Leased::new(UTConfig::<()>::now(), Duration::from_millis(500), Vote::new(1, 2)), membership_state: MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), @@ -35,7 +42,7 @@ fn test_forward_to_leader_vote_not_committed() { #[test] fn test_forward_to_leader_not_a_member() { - let rs = RaftState:: { + let rs = RaftState::> { vote: Leased::new( UTConfig::<()>::now(), Duration::from_millis(500), diff --git a/openraft/src/raft_state/tests/log_state_reader_test.rs b/openraft/src/raft_state/tests/log_state_reader_test.rs index 7a239684f..e32b12c5d 100644 --- a/openraft/src/raft_state/tests/log_state_reader_test.rs +++ b/openraft/src/raft_state/tests/log_state_reader_test.rs @@ -1,12 +1,11 @@ use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::raft_state::LogStateReader; -use crate::testing; use crate::type_config::alias::LogIdOf; use crate::RaftState; fn log_id(term: u64, index: u64) -> LogIdOf { - testing::log_id(term, 0, index) + crate::engine::testing::log_id(term, 0, index) } #[test] @@ -44,7 +43,7 @@ fn test_raft_state_prev_log_id() -> anyhow::Result<()> { fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> { let rs = RaftState::::default(); - assert!(!rs.has_log_id(&log_id(0, 0))); + assert!(!rs.has_log_id(log_id(0, 0))); Ok(()) } @@ -56,9 +55,9 @@ fn test_raft_state_has_log_id_committed_gets_true() -> anyhow::Result<()> { ..Default::default() }; - assert!(rs.has_log_id(&log_id(0, 0))); - assert!(rs.has_log_id(&log_id(2, 1))); - assert!(!rs.has_log_id(&log_id(2, 2))); + assert!(rs.has_log_id(log_id(0, 0))); + assert!(rs.has_log_id(log_id(2, 1))); + assert!(!rs.has_log_id(log_id(2, 2))); Ok(()) } @@ -71,14 +70,14 @@ fn test_raft_state_has_log_id_in_log_id_list() -> anyhow::Result<()> { ..Default::default() }; - assert!(rs.has_log_id(&log_id(0, 0))); - assert!(rs.has_log_id(&log_id(2, 1))); - assert!(rs.has_log_id(&log_id(1, 3))); - assert!(rs.has_log_id(&log_id(3, 4))); + assert!(rs.has_log_id(log_id(0, 0))); + assert!(rs.has_log_id(log_id(2, 1))); + assert!(rs.has_log_id(log_id(1, 3))); + assert!(rs.has_log_id(log_id(3, 4))); - assert!(!rs.has_log_id(&log_id(2, 3))); - assert!(!rs.has_log_id(&log_id(2, 4))); - assert!(!rs.has_log_id(&log_id(3, 5))); + assert!(!rs.has_log_id(log_id(2, 3))); + assert!(!rs.has_log_id(log_id(2, 4))); + assert!(!rs.has_log_id(log_id(3, 5))); Ok(()) } diff --git a/openraft/src/raft_state/tests/validate_test.rs b/openraft/src/raft_state/tests/validate_test.rs index c6993090f..d887f79b3 100644 --- a/openraft/src/raft_state/tests/validate_test.rs +++ b/openraft/src/raft_state/tests/validate_test.rs @@ -3,12 +3,11 @@ use validit::Validate; use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::storage::SnapshotMeta; -use crate::testing; use crate::type_config::alias::LogIdOf; use crate::RaftState; fn log_id(term: u64, index: u64) -> LogIdOf { - testing::log_id(term, 0, index) + crate::engine::testing::log_id(term, 0, index) } #[test] diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index c4e61ff92..531bc6d8b 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -26,6 +26,8 @@ use crate::core::notification::Notification; use crate::core::sm::handle::SnapshotReader; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; +use crate::entry::raft_entry_ext::RaftEntryExt; +use crate::entry::RaftEntry; use crate::error::HigherVote; use crate::error::PayloadTooLarge; use crate::error::RPCError; @@ -58,7 +60,6 @@ use crate::type_config::alias::VoteOf; use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; -use crate::RaftLogId; use crate::RaftNetworkFactory; use crate::RaftTypeConfig; use crate::StorageError; @@ -397,8 +398,8 @@ where // limited_get_log_entries will return logs smaller than the range [start, end). let logs = self.log_reader.limited_get_log_entries(start, end).await?; - let first = logs.first().map(|x| x.get_log_id()).unwrap(); - let last = logs.last().map(|x| x.get_log_id().clone()).unwrap(); + let first = logs.first().map(|x| x.ref_log_id()).unwrap(); + let last = logs.last().map(|x| x.log_id()).unwrap(); debug_assert!( !logs.is_empty() && logs.len() <= (end - start) as usize, diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 132639e49..26be0ec33 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -76,11 +76,11 @@ mod tests { #[test] fn test_replication_result_display() { - let result = ReplicationResult::(Ok(Some(log_id(1, 2, 3)))); + let result = ReplicationResult::(Ok(Some(log_id::(1, 2, 3)))); let want = format!("(Match:{})", log_id::(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); - let result = ReplicationResult::(Err(log_id(1, 2, 3))); + let result = ReplicationResult::(Err(log_id::(1, 2, 3))); let want = format!("(Conflict:{})", log_id::(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 27b8f8fea..f269a6e68 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -7,8 +7,8 @@ use validit::Valid; use crate::display_ext::DisplayOptionExt; use crate::engine::LogIdList; +use crate::entry::RaftEntry; use crate::entry::RaftPayload; -use crate::log_id::RaftLogId; use crate::raft_state::IOState; use crate::storage::log_reader_ext::RaftLogReaderExt; use crate::storage::RaftLogStorage; @@ -193,8 +193,8 @@ where let chunk_end = std::cmp::min(end, start + chunk_size); let entries = log_reader.try_get_log_entries(start..chunk_end).await?; - let first = entries.first().map(|x| x.get_log_id().index()); - let last = entries.last().map(|x| x.get_log_id().index()); + let first = entries.first().map(|x| x.index()); + let last = entries.last().map(|x| x.index()); let make_err = || { let err = AnyError::error(format!( @@ -299,7 +299,7 @@ where for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { - let em = StoredMembership::new(Some(ent.get_log_id().clone()), mem); + let em = StoredMembership::new(Some(ent.log_id()), mem); res.insert(0, em); if res.len() == 2 { return Ok(res); diff --git a/openraft/src/storage/log_reader_ext.rs b/openraft/src/storage/log_reader_ext.rs index 9bc9645e1..3cd4298c2 100644 --- a/openraft/src/storage/log_reader_ext.rs +++ b/openraft/src/storage/log_reader_ext.rs @@ -1,8 +1,8 @@ use anyerror::AnyError; use openraft_macros::add_async_trait; +use crate::entry::RaftEntry; use crate::type_config::alias::LogIdOf; -use crate::RaftLogId; use crate::RaftLogReader; use crate::RaftTypeConfig; use crate::StorageError; @@ -30,7 +30,7 @@ where C: RaftTypeConfig )); } - Ok(entries[0].get_log_id().clone()) + Ok(entries[0].log_id()) } } diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index c13d4cd07..791a2f1ec 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -3,7 +3,7 @@ use openraft_macros::add_async_trait; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; -use crate::log_id::RaftLogId; +use crate::entry::RaftEntry; use crate::raft_state::io_state::io_id::IOId; use crate::storage::IOFlushed; use crate::storage::RaftLogStorage; @@ -31,7 +31,7 @@ where C: RaftTypeConfig { let entries = entries.into_iter().collect::>(); - let last_log_id = entries.last().unwrap().get_log_id().clone(); + let last_log_id = entries.last().unwrap().log_id(); let (tx, mut rx) = C::mpsc_unbounded(); diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs index d4bc35f71..e555710ee 100644 --- a/openraft/src/testing/common.rs +++ b/openraft/src/testing/common.rs @@ -5,7 +5,6 @@ use std::collections::BTreeSet; use crate::entry::RaftEntry; use crate::type_config::alias::LogIdOf; use crate::vote::RaftLeaderIdExt; -use crate::LogId; use crate::RaftTypeConfig; /// Builds a log id, for testing purposes. @@ -14,10 +13,7 @@ where C: RaftTypeConfig, C::Term: From, { - LogId:: { - leader_id: C::LeaderId::new_committed(term.into(), node_id), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), node_id), index) } /// Create a blank log entry for test. @@ -26,22 +22,18 @@ where C: RaftTypeConfig, C::Term: From, { - crate::Entry::::new_blank(log_id(term, node_id, index)) + crate::Entry::::new_blank(log_id::(term, node_id, index)) } /// Create a membership log entry without learner config for test. -pub fn membership_ent( - term: u64, - node_id: C::NodeId, - index: u64, - config: Vec>, -) -> crate::Entry +pub fn membership_ent(term: u64, node_id: C::NodeId, index: u64, config: Vec>) -> crate::Entry where + C: RaftTypeConfig, C::Term: From, C::Node: Default, { crate::Entry::new_membership( - log_id(term, node_id, index), + log_id::(term, node_id, index), crate::Membership::new_with_defaults(config, []), ) } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index e4170f071..65b089be1 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -11,7 +11,6 @@ use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; use crate::entry::RaftEntry; -use crate::log_id::RaftLogId; use crate::membership::EffectiveMembership; use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::LogStateReader; @@ -28,7 +27,6 @@ use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderIdExt; -use crate::LogId; use crate::Membership; use crate::OptionalSend; use crate::RaftLogReader; @@ -492,13 +490,13 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(3, 2)), + Some(&log_id_0::(3, 2)), initial.last_log_id(), "state machine has higher log" ); assert_eq!( initial.committed(), - Some(&log_id_0(3, 1)), + Some(&log_id_0::(3, 1)), "unexpected value for last applied log" ); assert_eq!( @@ -581,7 +579,7 @@ where ]) .await?; - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; append(&mut store, [membership_ent_0::(1, 3, btreeset! {1,2,3})]).await?; let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; @@ -605,7 +603,7 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(2, 1)), + Some(&log_id_0::(2, 1)), initial.last_log_id(), "state machine has higher log" ); @@ -622,23 +620,20 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; assert_eq!( - Some(&log_id_0(3, 1)), + Some(&log_id_0::(3, 1)), initial.last_log_id(), "state machine has higher log" ); assert_eq!( initial.last_purged_log_id().cloned(), - Some(log_id_0(3, 1)), + Some(log_id_0::(3, 1)), "state machine has higher log" ); Ok(()) } pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - let log_id = |t: u64, n: u64, i| LogId:: { - leader_id: C::LeaderId::new_committed(t.into(), n.into()), - index: i, - }; + let log_id = |t: u64, n: u64, i| LogIdOf::::new(C::LeaderId::new_committed(t.into(), n.into()), i); tracing::info!("--- empty store, expect []"); { @@ -799,11 +794,11 @@ where blank_ent_0::(1, 5), ]) .await?; - store.purge(log_id_0(1, 1)).await?; + store.purge(log_id_0::(1, 1)).await?; apply(&mut sm, [blank_ent_0::(1, 2)]).await?; - store.save_committed(Some(log_id_0(1, 4))).await?; + store.save_committed(Some(log_id_0::(1, 4))).await?; let got = store.read_committed().await?; if got.is_none() { tracing::info!("This implementation does not store committed log id, skip test re-applying committed logs"); @@ -812,9 +807,13 @@ where let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; - assert_eq!(Some(&log_id_0(1, 4)), initial.io_applied(), "last_applied is updated"); assert_eq!( - Some(log_id_0(1, 4)), + Some(&log_id_0::(1, 4)), + initial.io_applied(), + "last_applied is updated" + ); + assert_eq!( + Some(log_id_0::(1, 4)), sm.applied_state().await?.0, "last_applied is updated" ); @@ -845,8 +844,8 @@ where let logs = store.try_get_log_entries(5..7).await?; assert_eq!(logs.len(), 2); - assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5)); - assert_eq!(*logs[1].get_log_id(), log_id_0(1, 6)); + assert_eq!(logs[0].log_id(), log_id_0::(1, 5)); + assert_eq!(logs[1].log_id(), log_id_0::(1, 6)); } Ok(()) @@ -867,7 +866,7 @@ where assert!(!logs.is_empty()); assert!(logs.len() <= 2); - assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5)); + assert_eq!(logs[0].log_id(), log_id_0::(1, 5)); } Ok(()) @@ -876,20 +875,20 @@ where pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id(0, 0, 0)).await?; + store.purge(log_id::(0, 0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let ent = store.try_get_log_entry(3).await?; - assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.get_log_id().clone())); + assert_eq!(Some(log_id_0::(1, 3)), ent.map(|x| x.log_id())); let ent = store.try_get_log_entry(0).await?; - assert_eq!(None, ent.map(|x| x.get_log_id().clone())); + assert_eq!(None, ent.map(|x| x.log_id())); let ent = store.try_get_log_entry(11).await?; - assert_eq!(None, ent.map(|x| x.get_log_id().clone())); + assert_eq!(None, ent.map(|x| x.log_id())); Ok(()) } @@ -918,38 +917,38 @@ where let st = store.get_log_state().await?; assert_eq!(None, st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete log 0-0"); { - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; let st = store.get_log_state().await?; - assert_eq!(Some(log_id(0, 0, 0)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id::(0, 0, 0)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete all log"); { - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; let st = store.get_log_state().await?; - assert_eq!(Some(log_id_0(1, 2)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(1, 2)), st.last_log_id); } tracing::info!("--- delete advance last present logs"); { - store.purge(log_id_0(2, 3)).await?; + store.purge(log_id_0::(2, 3)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let st = store.get_log_state().await?; - assert_eq!(Some(log_id_0(2, 3)), st.last_purged_log_id); - assert_eq!(Some(log_id_0(2, 3)), st.last_log_id); + assert_eq!(Some(log_id_0::(2, 3)), st.last_purged_log_id); + assert_eq!(Some(log_id_0::(2, 3)), st.last_log_id); } Ok(()) @@ -958,7 +957,7 @@ where pub async fn get_log_id(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 3)).await?; + store.purge(log_id_0::(1, 3)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -971,10 +970,10 @@ where assert!(res.is_err()); let res = store.get_log_id(4).await?; - assert_eq!(log_id_0(1, 4), res); + assert_eq!(log_id_0::(1, 4), res); let res = store.get_log_id(10).await?; - assert_eq!(log_id_0(1, 10), res); + assert_eq!(log_id_0::(1, 10), res); let res = store.get_log_id(11).await; assert!(res.is_err()); @@ -996,26 +995,26 @@ where .await?; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } tracing::info!("--- last id in logs < last applied id in sm, only return the id in logs"); { apply(&mut sm, [blank_ent_0::(1, 3)]).await?; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } tracing::info!("--- no logs, return default"); { - store.purge(log_id_0(1, 2)).await?; + store.purge(log_id_0::(1, 2)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. C::sleep(Duration::from_millis(1_000)).await; let last_log_id = store.get_log_state().await?.last_log_id; - assert_eq!(Some(log_id_0(1, 2)), last_log_id); + assert_eq!(Some(log_id_0::(1, 2)), last_log_id); } Ok(()) @@ -1031,10 +1030,10 @@ where apply(&mut sm, [membership_ent_0::(1, 3, btreeset! {1,2})]).await?; let (applied, mem) = sm.applied_state().await?; - assert_eq!(Some(log_id_0(1, 3)), applied); + assert_eq!(Some(log_id_0::(1, 3)), applied); assert_eq!( StoredMembership::new( - Some(log_id_0(1, 3)), + Some(log_id_0::(1, 3)), Membership::new_with_defaults(vec![btreeset! {1,2}], []) ), mem @@ -1046,10 +1045,10 @@ where apply(&mut sm, [blank_ent_0::(1, 5)]).await?; let (applied, mem) = sm.applied_state().await?; - assert_eq!(Some(log_id_0(1, 5)), applied); + assert_eq!(Some(log_id_0::(1, 5)), applied); assert_eq!( StoredMembership::new( - Some(log_id_0(1, 3)), + Some(log_id_0::(1, 3)), Membership::new_with_defaults(vec![btreeset! {1,2}], []) ), mem @@ -1064,7 +1063,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1072,12 +1071,12 @@ where let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 10); - assert_eq!(logs[0].get_log_id().index(), 1); + assert_eq!(logs[0].index(), 1); assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(0, 0)), - last_log_id: Some(log_id_0(1, 10)), + last_purged_log_id: Some(log_id_0::(0, 0)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1089,7 +1088,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 5)).await?; + store.purge(log_id_0::(1, 5)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1097,12 +1096,12 @@ where let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 5); - assert_eq!(logs[0].get_log_id().index(), 6); + assert_eq!(logs[0].index(), 6); assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(1, 5)), - last_log_id: Some(log_id_0(1, 10)), + last_purged_log_id: Some(log_id_0::(1, 5)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1114,7 +1113,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(1, 20)).await?; + store.purge(log_id_0::(1, 20)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1125,8 +1124,8 @@ where assert_eq!( LogState { - last_purged_log_id: Some(log_id_0(1, 20)), - last_log_id: Some(log_id_0(1, 20)), + last_purged_log_id: Some(log_id_0::(1, 20)), + last_log_id: Some(log_id_0::(1, 20)), }, store.get_log_state().await? ); @@ -1138,7 +1137,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.truncate(log_id_0(1, 11)).await?; + store.truncate(log_id_0::(1, 11)).await?; let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 11); @@ -1146,7 +1145,7 @@ where assert_eq!( LogState { last_purged_log_id: None, - last_log_id: Some(log_id_0(1, 10)), + last_log_id: Some(log_id_0::(1, 10)), }, store.get_log_state().await? ); @@ -1158,7 +1157,7 @@ where Self::feed_10_logs_vote_self(&mut store).await?; - store.truncate(log_id_0(0, 0)).await?; + store.truncate(log_id_0::(0, 0)).await?; let logs = store.try_get_log_entries(0..100).await?; assert_eq!(logs.len(), 0); @@ -1177,7 +1176,7 @@ where pub async fn append_to_log(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(log_id_0(0, 0)).await?; + store.purge(log_id_0::(0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -1189,7 +1188,7 @@ where let last = store.try_get_log_entries(0..).await?.into_iter().last().unwrap(); assert_eq!(l, 11, "expected 11 entries to exist in the log"); - assert_eq!(*last.get_log_id(), log_id_0(2, 11), "unexpected log id"); + assert_eq!(last.log_id(), log_id_0::(2, 11), "unexpected log id"); Ok(()) } @@ -1201,8 +1200,8 @@ where let mut b = sm.get_snapshot_builder().await; let snap = b.build_snapshot().await?; let meta = snap.meta; - assert_eq!(Some(log_id_0(0, 0)), meta.last_log_id); - assert_eq!(&Some(log_id_0(0, 0)), meta.last_membership.log_id()); + assert_eq!(Some(log_id_0::(0, 0)), meta.last_log_id); + assert_eq!(&Some(log_id_0::(0, 0)), meta.last_membership.log_id()); assert_eq!( &Membership::new_with_defaults(vec![btreeset! {1,2}], []), meta.last_membership.membership() @@ -1220,8 +1219,8 @@ where let mut b = sm.get_snapshot_builder().await; let snap = b.build_snapshot().await?; let meta = snap.meta; - assert_eq!(Some(log_id_0(2, 2)), meta.last_log_id); - assert_eq!(&Some(log_id_0(2, 2)), meta.last_membership.log_id()); + assert_eq!(Some(log_id_0::(2, 2)), meta.last_log_id); + assert_eq!(&Some(log_id_0::(2, 2)), meta.last_membership.log_id()); assert_eq!( &Membership::new_with_defaults(vec![btreeset! {3,4}], []), meta.last_membership.membership() @@ -1243,7 +1242,7 @@ where assert_eq!(replies.len(), 1, "expected 1 response"); let (last_applied, _) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(0, 0)),); + assert_eq!(last_applied, Some(log_id_0::(0, 0)),); } tracing::info!("--- apply membership entry"); @@ -1254,7 +1253,7 @@ where assert_eq!(replies.len(), 1, "expected 1 response"); let (last_applied, mem) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(1, 1)),); + assert_eq!(last_applied, Some(log_id_0::(1, 1)),); assert_eq!( mem.membership(), &Membership::new_with_defaults(vec![btreeset! {1,2}], []) @@ -1266,7 +1265,7 @@ where // { // let entry = { // let mut e = C::Entry::from_app_data(C::D::from(1)); - // e.set_log_id(&log_id_0(2, 2)); + // e.set_log_id(&log_id_0::(2, 2)); // e // }; // @@ -1274,7 +1273,7 @@ where // assert_eq!(replies.len(), 1, "expected 1 response"); // let (last_applied, _) = sm.applied_state().await?; // - // assert_eq!(last_applied, Some(log_id_0(2, 2)),); + // assert_eq!(last_applied, Some(log_id_0::(2, 2)),); // } Ok(()) @@ -1290,7 +1289,7 @@ where assert_eq!(replies.len(), 2); let (last_applied, mem) = sm.applied_state().await?; - assert_eq!(last_applied, Some(log_id_0(1, 1)),); + assert_eq!(last_applied, Some(log_id_0::(1, 1)),); assert_eq!( mem.membership(), &Membership::new_with_defaults(vec![btreeset! {1,2}], []) @@ -1314,9 +1313,9 @@ where // Add a few entries so we have state to snapshot let snapshot_entries = vec![membership_ent_0::(1, 2, btreeset! {1, 2, 3}), blank_ent_0::(3, 3)]; sm_l.apply(snapshot_entries).await?; - let snapshot_last_log_id = Some(log_id_0(3, 3)); + let snapshot_last_log_id = Some(log_id_0::(3, 3)); let snapshot_last_membership = StoredMembership::new( - Some(log_id_0(1, 2)), + Some(log_id_0::(1, 2)), Membership::new_with_defaults(vec![btreeset![1, 2, 3]], []), ); let snapshot_applied_state = (snapshot_last_log_id.clone(), snapshot_last_membership.clone()); @@ -1380,10 +1379,7 @@ where C: RaftTypeConfig, C::NodeId: From, { - LogId { - leader_id: C::LeaderId::new_committed(term.into(), NODE_ID.into()), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), NODE_ID.into()), index) } /// Create a blank log entry with node_id 0 for test. @@ -1392,7 +1388,7 @@ where C::Term: From, C::NodeId: From, { - C::Entry::new_blank(log_id(term, 0, index)) + C::Entry::new_blank(log_id::(term, 0, index)) } /// Create a membership entry with node_id 0 for test. @@ -1402,7 +1398,7 @@ where C::NodeId: From, C::Node: Default, { - C::Entry::new_membership(log_id_0(term, index), Membership::new_with_defaults(vec![bs], [])) + C::Entry::new_membership(log_id_0::(term, index), Membership::new_with_defaults(vec![bs], [])) } /// Build a `RaftLogStorage` and `RaftStateMachine` implementation and run a test on it. @@ -1441,7 +1437,7 @@ where { let entries = entries.into_iter().collect::>(); - let last_log_id = entries.last().unwrap().get_log_id().clone(); + let last_log_id = entries.last().unwrap().log_id(); let (tx, mut rx) = C::mpsc_unbounded(); @@ -1464,8 +1460,5 @@ where C::Term: From, C::NodeId: From, { - LogId { - leader_id: C::LeaderId::new_committed(term.into(), node_id.into()), - index, - } + LogIdOf::::new(C::LeaderId::new_committed(term.into(), node_id.into()), index) } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 31846fe8f..1d37d1bbd 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -13,7 +13,6 @@ pub use async_runtime::MpscUnbounded; pub use async_runtime::OneshotSender; pub use util::TypeConfigExt; -use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; use crate::vote::raft_vote::RaftVote; @@ -89,7 +88,7 @@ pub trait RaftTypeConfig: type Vote: RaftVote; /// Raft log entry, which can be built from an AppData. - type Entry: RaftEntry + FromAppData; + type Entry: RaftEntry; /// Snapshot data for exposing a snapshot for reading & writing. /// diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 01f8bc910..f02ef7319 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::alias::SnapshotDataOf; +use openraft::entry::RaftEntry; use openraft::storage::IOFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogReader; @@ -26,7 +27,6 @@ use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; use openraft::OptionalSend; -use openraft::RaftLogId; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; @@ -336,7 +336,7 @@ impl RaftLogStorage for Arc { Some(serialized) => { let ent: Entry = serde_json::from_str(serialized).map_err(|e| StorageError::read_logs(&e))?; - Some(*ent.get_log_id()) + Some(ent.log_id()) } }; @@ -392,9 +392,8 @@ impl RaftLogStorage for Arc { where I: IntoIterator> + OptionalSend { let mut log = self.log.write().await; for entry in entries { - let s = - serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(*entry.get_log_id(), &e))?; - log.insert(entry.log_id.index(), s); + let s = serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(entry.log_id(), &e))?; + log.insert(entry.log_id.index, s); } callback.io_completed(Ok(())); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 54d12137e..7459a0d28 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -52,7 +52,6 @@ use openraft::LogIdOptionExt; use openraft::OptionalSend; use openraft::RPCTypes; use openraft::Raft; -use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftMetrics; use openraft::RaftState; @@ -193,6 +192,7 @@ impl fmt::Display for Direction { use openraft::alias::LogIdOf; use openraft::alias::VoteOf; +use openraft::entry::RaftEntry; use openraft::network::v2::RaftNetworkV2; use openraft::vote::RaftLeaderId; use openraft::vote::RaftLeaderIdExt; @@ -1052,7 +1052,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { rpc.entries.truncate(quota as usize); *x = Some(0); if let Some(last) = rpc.entries.last() { - Some(Some(*last.get_log_id())) + Some(Some(last.log_id())) } else { Some(rpc.prev_log_id) }