Skip to content

Commit

Permalink
Refactor: Introduce RefLogId as a reference to a log ID
Browse files Browse the repository at this point in the history
Existing `LogIdOf<C>` provides a minimal storage implementation for a
log ID with essential properties. In contrast, `RefLogId` offers the
same information as `LogIdOf<C>` while adding additional system-defined
properties.

For example, in the future, `LogIdOf<C>` defined by the application will
not need to implement `Ord`. However, `RefLogId`, used internally, will
provide a system-defined `Ord` implementation.

This change updates internal components to return `RefLogId` or accept
it as an argument where possible, enabling more flexibility and
consistency in handling log IDs.

Change: refine the `RaftEntry` trait

- The `RaftEntry` trait now requires `AsRef<LogIdOf<C>>` and
  `AsMut<LogIdOf<C>>`, providing a more standard API for accessing the
  log ID of a log entry. As a result, the `RaftEntry: RaftLogId`
  requirement is no longer needed.

- A new method, `new_normal()`, has been added to the `RaftEntry` trait
  to replace the `FromAppData` trait.

- Additional utility methods for working with entries are now provided
  in the `RaftEntryExt` trait.

- Part of databendlabs#1278

Upgrade tips:

1. **For applications using a custom `RaftEntry` implementation** (e.g.,
   declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`):
   - Update the `RaftEntry` implementation for your custom entry type
     (`MyEntry`) by adding the `new_normal()` method.
   - Implement `AsRef<LogId>` and `AsMut<LogId>` for your custom entry
     type.

2. **For applications using the default `Entry` provided by OpenRaft**:
   - No changes are required.

empty: try to remove log id from RaftTypeConfig
  • Loading branch information
drmingdrmer committed Jan 28, 2025
1 parent 3b76a7e commit 1965b64
Show file tree
Hide file tree
Showing 51 changed files with 838 additions and 387 deletions.
1 change: 0 additions & 1 deletion cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StoredMembership;
Expand Down
17 changes: 17 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: remove serde

tonic_build::configure()
.btree_map(["."])
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
"openraftpb.SetRequest",
Expand All @@ -25,6 +26,22 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
"google.protobuf.Empty",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.NodeIdSet",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Membership",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Entry",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
6 changes: 1 addition & 5 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";
import "internal_service.proto";
package openraftpb;

// ApiService provides the key-value store API operations
Expand All @@ -20,8 +21,3 @@ message Response {
optional string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}
77 changes: 71 additions & 6 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package openraftpb;

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
Expand All @@ -13,6 +26,33 @@ message Vote {
bool committed = 2;
}

message Entry {
uint64 term = 1;
uint64 index = 2;

// Optional Application data
SetRequest app_data = 12;

// Optional Membership config
Membership membership = 13;
}


// NodeIds is a set of NodeIds
message NodeIdSet {
map<uint64, google.protobuf.Empty> node_ids = 1;
}

// Membership config
message Membership {
// Joint(includes more than one NodeIdSet) or uniform(one NodeIdSet) config.
repeated NodeIdSet configs = 1;

// All of the nodes in the cluster, including voters and learners.
// A node id that is included in `configs` is a voter, otherwise it is a learner.
map<uint64, Node> nodes = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 term = 1;
Expand All @@ -32,23 +72,48 @@ message VoteResponse {
LogId last_log_id = 3;
}

message AppendEntriesRequest {
// The leader's vote, used to identify the leader, and must be committed
Vote vote = 1;

// The previous log id the leader has sent to the follower
LogId prev_log_id = 2;

// The entries to be appended to the follower's log
repeated Entry entries = 3;

// The leader's last committed log id
LogId leader_commit = 4;
}

message AppendEntriesResponse {
// If not None, the follower rejected the AppendEntries request due to having a higher vote.
// All other fields are valid only when this field is None
Vote rejected_by = 1;

// The follower accepts this AppendEntries request's vote, but the prev_log_id conflicts with
// the follower's log. The leader should retry with a smaller prev_log_id that matches the
// follower's log. All subsequent fields are valid only when this field is false
bool conflict = 2;

// The last log id the follower accepted from this request.
// If None, all input entries were accepted and persisted.
// Otherwise, only entries up to and including this id were accepted
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}

// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";
import "internal_service.proto";
package openraftpb;

// ManagementService handles Raft cluster management operations
Expand All @@ -21,11 +22,6 @@ message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Expand Down
23 changes: 8 additions & 15 deletions examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use tonic::Status;
use tonic::Streaming;
use tracing::debug;

use crate::protobuf as pb;
use crate::protobuf::internal_service_server::InternalService;
use crate::protobuf::RaftReplyBytes;
use crate::protobuf::RaftRequestBytes;
use crate::protobuf::SnapshotRequest;
use crate::protobuf::VoteRequest;
use crate::protobuf::VoteResponse;
Expand Down Expand Up @@ -75,15 +75,10 @@ impl InternalService for InternalServiceImpl {
/// Nodes vote for candidates based on log completeness and term numbers.
async fn vote(&self, request: Request<VoteRequest>) -> Result<Response<VoteResponse>, Status> {
debug!("Processing vote request");
let req = request.into_inner();

// Deserialize the vote request
let vote_req = req.into();

// Process the vote request
let vote_resp = self
.raft_node
.vote(vote_req)
.vote(request.into_inner().into())
.await
.map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?;

Expand All @@ -103,22 +98,20 @@ impl InternalService for InternalServiceImpl {
/// # Protocol Details
/// This implements the AppendEntries RPC from the Raft protocol.
/// Used for both log replication and as heartbeat mechanism.
async fn append_entries(&self, request: Request<RaftRequestBytes>) -> Result<Response<RaftReplyBytes>, Status> {
async fn append_entries(
&self,
request: Request<pb::AppendEntriesRequest>,
) -> Result<Response<pb::AppendEntriesResponse>, Status> {
debug!("Processing append entries request");
let req = request.into_inner();

// Deserialize the append request
let append_req = Self::deserialize_request(&req.value)?;

// Process the append request
let append_resp = self
.raft_node
.append_entries(append_req)
.append_entries(request.into_inner().into())
.await
.map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?;

debug!("Append entries request processed successfully");
Self::create_response(append_resp)
Ok(Response::new(append_resp.into()))
}

/// Handles snapshot installation requests for state transfer using streaming.
Expand Down
68 changes: 68 additions & 0 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ openraft::declare_raft_types!(
R = pb::Response,
LeaderId = pb::LeaderId,
Vote = pb::Vote,
Entry = pb::Entry,
Node = pb::Node,
SnapshotData = StateMachineData,
);
Expand Down Expand Up @@ -57,6 +58,73 @@ impl From<pb::VoteResponse> for VoteResponse {
}
}

impl From<pb::AppendEntriesRequest> for AppendEntriesRequest {
fn from(proto_req: pb::AppendEntriesRequest) -> Self {
AppendEntriesRequest {
vote: proto_req.vote.unwrap(),
prev_log_id: proto_req.prev_log_id.map(|log_id| log_id.into()),
entries: proto_req.entries,
leader_commit: proto_req.leader_commit.map(|log_id| log_id.into()),
}
}
}

impl From<AppendEntriesRequest> for pb::AppendEntriesRequest {
fn from(value: AppendEntriesRequest) -> Self {
pb::AppendEntriesRequest {
vote: Some(value.vote),
prev_log_id: value.prev_log_id.map(|log_id| log_id.into()),
entries: value.entries,
leader_commit: value.leader_commit.map(|log_id| log_id.into()),
}
}
}

impl From<pb::AppendEntriesResponse> for AppendEntriesResponse {
fn from(r: pb::AppendEntriesResponse) -> Self {
if let Some(higher) = r.rejected_by {
return AppendEntriesResponse::HigherVote(higher);
}

if r.conflict {
return AppendEntriesResponse::Conflict;
}

if let Some(log_id) = r.last_log_id {
AppendEntriesResponse::PartialSuccess(Some(log_id.into()))
} else {
AppendEntriesResponse::Success
}
}
}

impl From<AppendEntriesResponse> for pb::AppendEntriesResponse {
fn from(r: AppendEntriesResponse) -> Self {
match r {
AppendEntriesResponse::Success => pb::AppendEntriesResponse {
rejected_by: None,
conflict: false,
last_log_id: None,
},
AppendEntriesResponse::PartialSuccess(p) => pb::AppendEntriesResponse {
rejected_by: None,
conflict: false,
last_log_id: p.map(|log_id| log_id.into()),
},
AppendEntriesResponse::Conflict => pb::AppendEntriesResponse {
rejected_by: None,
conflict: true,
last_log_id: None,
},
AppendEntriesResponse::HigherVote(v) => pb::AppendEntriesResponse {
rejected_by: Some(v),
conflict: false,
last_log_id: None,
},
}
}
}

impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
pb::LogId {
Expand Down
13 changes: 6 additions & 7 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tonic::transport::Channel;

use crate::protobuf as pb;
use crate::protobuf::internal_service_client::InternalServiceClient;
use crate::protobuf::RaftRequestBytes;
use crate::protobuf::SnapshotRequest;
use crate::protobuf::VoteRequest as PbVoteRequest;
use crate::protobuf::VoteResponse as PbVoteResponse;
Expand Down Expand Up @@ -65,12 +64,12 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
};
let mut client = InternalServiceClient::new(channel);

let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let request = RaftRequestBytes { value };
let response = client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let message = response.into_inner();
let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
Ok(result)
let response = client
.append_entries(pb::AppendEntriesRequest::from(req))
.await
.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let response = response.into_inner();
Ok(AppendEntriesResponse::from(response))
}

async fn full_snapshot(
Expand Down
50 changes: 50 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/impl_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::fmt;

use openraft::alias::LogIdOf;
use openraft::entry::RaftEntry;
use openraft::entry::RaftPayload;
use openraft::EntryPayload;
use openraft::Membership;

use crate::protobuf as pb;
use crate::TypeConfig;

impl fmt::Display for pb::Entry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Entry{{term={},index={}}}", self.term, self.index)
}
}

impl RaftPayload<TypeConfig> for pb::Entry {
fn get_membership(&self) -> Option<Membership<TypeConfig>> {
self.membership.clone().map(Into::into)
}
}

impl RaftEntry<TypeConfig> for pb::Entry {
fn new(log_id: LogIdOf<TypeConfig>, payload: EntryPayload<TypeConfig>) -> Self {
let mut app_data = None;
let mut membership = None;
match payload {
EntryPayload::Blank => {}
EntryPayload::Normal(data) => app_data = Some(data),
EntryPayload::Membership(m) => membership = Some(m.into()),
}

Self {
term: log_id.leader_id,
index: log_id.index,
app_data,
membership,
}
}

fn log_id_parts(&self) -> (&u64, u64) {
(&self.term, self.index)
}

fn set_log_id(&mut self, new: LogIdOf<TypeConfig>) {
self.term = new.leader_id;
self.index = new.index;
}
}
Loading

0 comments on commit 1965b64

Please sign in to comment.