Skip to content

Commit

Permalink
Refactor: Introduce RefLogId as a reference to a log ID
Browse files Browse the repository at this point in the history
Existing `LogIdOf<C>` provides a minimal storage implementation for a
log ID with essential properties. In contrast, `RefLogId` offers the
same information as `LogIdOf<C>` while adding additional system-defined
properties.

For example, in the future, `LogIdOf<C>` defined by the application will
not need to implement `Ord`. However, `RefLogId`, used internally, will
provide a system-defined `Ord` implementation.

This change updates internal components to return `RefLogId` or accept
it as an argument where possible, enabling more flexibility and
consistency in handling log IDs.

Change: refine the `RaftEntry` trait

- The `RaftEntry` trait now requires `AsRef<LogIdOf<C>>` and
  `AsMut<LogIdOf<C>>`, providing a more standard API for accessing the
  log ID of a log entry. As a result, the `RaftEntry: RaftLogId`
  requirement is no longer needed.

- A new method, `new_normal()`, has been added to the `RaftEntry` trait
  to replace the `FromAppData` trait.

- Additional utility methods for working with entries are now provided
  in the `RaftEntryExt` trait.

- Part of #1278

Upgrade tips:

1. **For applications using a custom `RaftEntry` implementation** (e.g.,
   declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`):
   - Update the `RaftEntry` implementation for your custom entry type
     (`MyEntry`) by adding the `new_normal()` method.
   - Implement `AsRef<LogId>` and `AsMut<LogId>` for your custom entry
     type.

2. **For applications using the default `Entry` provided by OpenRaft**:
   - No changes are required.

empty: try to remove log id from RaftTypeConfig
  • Loading branch information
drmingdrmer committed Feb 1, 2025
1 parent 791dc3e commit f0fd058
Show file tree
Hide file tree
Showing 50 changed files with 957 additions and 455 deletions.
1 change: 0 additions & 1 deletion cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StoredMembership;
Expand Down
4 changes: 3 additions & 1 deletion examples/memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft = { path = "../../openraft", features = ["type-alias"] }

tokio = { version = "1.0", default-features = false, features = ["sync"] }

[features]

serde = ["openraft/serde"]

[package.metadata.docs.rs]
all-features = true
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft = { path = "../../openraft", features = ["type-alias"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand All @@ -30,7 +30,6 @@ tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tonic = "0.12.3"
tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"
futures = "0.3.31"
Expand Down
24 changes: 10 additions & 14 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: remove serde

tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
"openraftpb.SetRequest",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.btree_map(["."])
.type_attribute("openraftpb.Node", "#[derive(Eq)]")
.type_attribute("openraftpb.SetRequest", "#[derive(Eq)]")
.type_attribute("openraftpb.Response", "#[derive(Eq)]")
.type_attribute("openraftpb.LeaderId", "#[derive(Eq)]")
.type_attribute("openraftpb.Vote", "#[derive(Eq)]")
.type_attribute("openraftpb.NodeIdSet", "#[derive(Eq)]")
.type_attribute("openraftpb.Membership", "#[derive(Eq)]")
.type_attribute("openraftpb.Entry", "#[derive(Eq)]")
.type_attribute("google.protobuf.Empty", "#[derive(Eq)]")
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
6 changes: 1 addition & 5 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";
import "internal_service.proto";
package openraftpb;

// ApiService provides the key-value store API operations
Expand All @@ -20,8 +21,3 @@ message Response {
optional string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}
125 changes: 104 additions & 21 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package openraftpb;

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
Expand All @@ -13,12 +26,50 @@ message Vote {
bool committed = 2;
}

message Entry {
uint64 term = 1;
uint64 index = 2;

// Optional Application data
SetRequest app_data = 12;

// Optional Membership config
Membership membership = 13;
}


// NodeIds is a set of NodeIds
message NodeIdSet {
map<uint64, google.protobuf.Empty> node_ids = 1;
}

// Membership config
message Membership {
// Joint(includes more than one NodeIdSet) or uniform(one NodeIdSet) config.
repeated NodeIdSet configs = 1;

// All of the nodes in the cluster, including voters and learners.
// A node id that is included in `configs` is a voter, otherwise it is a learner.
map<uint64, Node> nodes = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 term = 1;
uint64 index = 2;
}

message StateMachineData {
// The last log id that has been applied to the state machine
LogId last_applied = 1;

map<string, string> data = 2;

LogId last_membership_log_id = 3;

Membership last_membership = 4;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
Expand All @@ -32,26 +83,46 @@ message VoteResponse {
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}
message AppendEntriesRequest {
// The leader's vote, used to identify the leader, and must be committed
Vote vote = 1;

// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}
// The previous log id the leader has sent to the follower
LogId prev_log_id = 2;

// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
// The entries to be appended to the follower's log
repeated Entry entries = 3;

// The leader's last committed log id
LogId leader_commit = 4;
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
message AppendEntriesResponse {
// If not None, the follower rejected the AppendEntries request due to having a higher vote.
// All other fields are valid only when this field is None
Vote rejected_by = 1;

// The follower accepts this AppendEntries request's vote, but the prev_log_id conflicts with
// the follower's log. The leader should retry with a smaller prev_log_id that matches the
// follower's log. All subsequent fields are valid only when this field is false
bool conflict = 2;

// The last log id the follower accepted from this request.
// If None, all input entries were accepted and persisted.
// Otherwise, only entries up to and including this id were accepted
LogId last_log_id = 3;
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
message SnapshotRequestMeta {
Vote vote = 1;

LogId last_log_id = 2;

LogId last_membership_log_id = 3;

Membership last_membership = 4;

string snapshot_id = 5;
}

// The item of snapshot chunk stream.
Expand All @@ -63,13 +134,25 @@ message RaftReplyBytes {
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
message SnapshotRequest {
oneof payload {
SnapshotRequestMeta meta = 1;
bytes chunk = 2;
}
}

message SnapshotResponse {
Vote vote = 1;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

// bytes serialized meta data, including vote and snapshot_meta.
// ```text
// (SnapshotFormat, Vote, SnapshotMeta)
// ```
bytes rpc_meta = 1;
// AppendEntries handles call related to append entries RPC
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse) {}

// Snapshot data chunk
bytes chunk = 2;
// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (SnapshotResponse) {}
}

39 changes: 20 additions & 19 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
import "internal_service.proto";
import "api_service.proto";
package openraftpb;

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (RaftReplyString) {}
rpc Init(InitRequest) returns (google.protobuf.Empty) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {}
rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {}
rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(RaftRequestString) returns (RaftReplyString) {}
rpc Metrics(google.protobuf.Empty) returns (MetricsString) {}
}

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
message MetricsString {
string data = 1; // Response data in string
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}

message ClientWriteResponse {
// The log id of the committed log entry.
LogId log_id = 1;

// If the committed log entry is a normal one.
Response data = 2;

// If the committed log entry is a change-membership entry.
Membership membership = 3;
}
Loading

0 comments on commit f0fd058

Please sign in to comment.