Skip to content

Commit

Permalink
Refactor: example raft-kv-memstore-grpc
Browse files Browse the repository at this point in the history
Move protobuf types to 3 files:
- `app.proto` defines application API and types.
- `raft.proto` defines Raft-protocol API and types.
- `management.proto` defines non-app management API and types, such as
  membership config API.

- Part of databendlabs#1287
  • Loading branch information
drmingdrmer committed Feb 3, 2025
1 parent 7bebecb commit 5acd698
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 87 deletions.
7 changes: 1 addition & 6 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
let mut config = prost_build::Config::new();
config.protoc_arg("--experimental_allow_proto3_optional");
let proto_files = [
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
];
let proto_files = ["proto/raft.proto", "proto/management.proto", "proto/app.proto"];

// TODO: remove serde

Expand All @@ -20,7 +16,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("openraftpb.NodeIdSet", "#[derive(Eq)]")
.type_attribute("openraftpb.Membership", "#[derive(Eq)]")
.type_attribute("openraftpb.Entry", "#[derive(Eq)]")
.type_attribute("google.protobuf.Empty", "#[derive(Eq)]")
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
syntax = "proto3";
import "internal_service.proto";
package openraftpb;

// ApiService provides the key-value store API operations
service ApiService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (Response) {}
package openraftpb;

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (Response) {}
// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// GetRequest represents a key lookup request
Expand All @@ -21,3 +18,11 @@ message Response {
optional string value = 1; // Retrieved value
}

// ApiService provides the key-value store API operations
service AppService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (Response) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (Response) {}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,30 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "internal_service.proto";
import "api_service.proto";
package openraftpb;
import "raft.proto";
import "app.proto";

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (google.protobuf.Empty) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(google.protobuf.Empty) returns (google.protobuf.StringValue) {}
}
package openraftpb;

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
// List of initial cluster nodes
repeated Node nodes = 1;
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
// Node to be added as a learner
Node node = 1;
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
// New set of voter node IDs
repeated uint64 members = 1;
// Whether to retain existing configuration
bool retain = 2;
}

message ClientWriteResponse {
Expand All @@ -45,4 +36,20 @@ message ClientWriteResponse {

// If the committed log entry is a change-membership entry.
Membership membership = 3;
}
}

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (google.protobuf.Empty) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(google.protobuf.Empty) returns (google.protobuf.StringValue) {}
}

Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
package openraftpb;
import "app.proto";

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}
package openraftpb;

// Node represents a single node in the Raft cluster
message Node {
Expand Down Expand Up @@ -37,7 +34,6 @@ message Entry {
Membership membership = 13;
}


// NodeIds is a set of NodeIds
message NodeIdSet {
map<uint64, google.protobuf.Empty> node_ids = 1;
Expand All @@ -59,21 +55,6 @@ message LogId {
uint64 index = 2;
}

// All the data in a state machine, including user defined data and membership data.
message StateMachineData {
// The last log id that has been applied to the state machine
LogId last_applied = 1;

// User data in a map
map<string, string> data = 2;

// The id of the last membership config log entry that is applied.
LogId last_membership_log_id = 3;

// The last membership config that is applied.
Membership last_membership = 4;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
Expand Down Expand Up @@ -132,12 +113,9 @@ message SnapshotRequestMeta {

// The item of snapshot chunk stream.
//
// The first item contains `rpc_meta`,
// including the application defined format of this snapshot data,
// the leader vote and snapshot-meta.
// The first item contains `meta`, including the leader vote and snapshot-meta.
//
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
// Since the second item, the chunk contains the snapshot data.
message SnapshotRequest {
oneof payload {
SnapshotRequestMeta meta = 1;
Expand All @@ -149,8 +127,23 @@ message SnapshotResponse {
Vote vote = 1;
}

// All the data in a state machine, including user defined data and membership data.
message StateMachineData {
// The last log id that has been applied to the state machine
LogId last_applied = 1;

// User data in a map
map<string, string> data = 2;

// The id of the last membership config log entry that is applied.
LogId last_membership_log_id = 3;

// The last membership config that is applied.
Membership last_membership = 4;
}

// InternalService handles internal Raft cluster communication
service InternalService {
service RaftService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

Expand All @@ -160,4 +153,3 @@ service InternalService {
// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (SnapshotResponse) {}
}

12 changes: 6 additions & 6 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::sync::Arc;

use clap::Parser;
use openraft::Config;
use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl;
use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl;
use raft_kv_memstore_grpc::grpc::app_service::AppServiceImpl;
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl;
use raft_kv_memstore_grpc::grpc::raft_service::RaftServiceImpl;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer;
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer;
use raft_kv_memstore_grpc::protobuf::app_service_server::AppServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::protobuf::raft_service_server::RaftServiceServer;
use raft_kv_memstore_grpc::typ::Raft;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::StateMachineStore;
Expand Down Expand Up @@ -61,8 +61,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Create the management service with raft instance
let management_service = ManagementServiceImpl::new(raft.clone());
let internal_service = InternalServiceImpl::new(raft.clone());
let api_service = ApiServiceImpl::new(raft, state_machine_store);
let internal_service = RaftServiceImpl::new(raft.clone());
let api_service = AppServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tonic::Response;
use tonic::Status;
use tracing::debug;

use crate::protobuf::api_service_server::ApiService;
use crate::protobuf::app_service_server::AppService;
use crate::protobuf::GetRequest;
use crate::protobuf::Response as PbResponse;
use crate::protobuf::SetRequest;
Expand All @@ -23,29 +23,29 @@ use crate::typ::*;
/// # Protocol Safety
/// This service implements the client-facing API and should validate all inputs
/// before processing them through the Raft consensus protocol.
pub struct ApiServiceImpl {
pub struct AppServiceImpl {
/// The Raft node instance for consensus operations
raft_node: Raft,
/// The state machine store for direct reads
state_machine_store: Arc<StateMachineStore>,
}

impl ApiServiceImpl {
impl AppServiceImpl {
/// Creates a new instance of the API service
///
/// # Arguments
/// * `raft_node` - The Raft node instance this service will use
/// * `state_machine_store` - The state machine store for reading data
pub fn new(raft_node: Raft, state_machine_store: Arc<StateMachineStore>) -> Self {
ApiServiceImpl {
AppServiceImpl {
raft_node,
state_machine_store,
}
}
}

#[tonic::async_trait]
impl ApiService for ApiServiceImpl {
impl AppService for AppServiceImpl {
/// Sets a value for a given key in the distributed store
///
/// # Arguments
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-grpc/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod api_service;
pub mod internal_service;
pub mod app_service;
pub mod management_service;
pub mod raft_service;
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tonic::Streaming;
use tracing::debug;

use crate::protobuf as pb;
use crate::protobuf::internal_service_server::InternalService;
use crate::protobuf::raft_service_server::RaftService;
use crate::protobuf::VoteRequest;
use crate::protobuf::VoteResponse;
use crate::typ::*;
Expand All @@ -23,23 +23,23 @@ use crate::typ::*;
/// # Protocol Safety
/// This service implements critical consensus protocol operations and should only be
/// exposed to other trusted Raft cluster nodes, never to external clients.
pub struct InternalServiceImpl {
pub struct RaftServiceImpl {
/// The local Raft node instance that this service operates on
raft_node: Raft,
}

impl InternalServiceImpl {
impl RaftServiceImpl {
/// Creates a new instance of the internal service
///
/// # Arguments
/// * `raft_node` - The Raft node instance this service will operate on
pub fn new(raft_node: Raft) -> Self {
InternalServiceImpl { raft_node }
RaftServiceImpl { raft_node }
}
}

#[tonic::async_trait]
impl InternalService for InternalServiceImpl {
impl RaftService for RaftServiceImpl {
/// Handles vote requests during leader election.
///
/// # Arguments
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;

use crate::protobuf as pb;
use crate::protobuf::internal_service_client::InternalServiceClient;
use crate::protobuf::raft_service_client::RaftServiceClient;
use crate::protobuf::VoteRequest as PbVoteRequest;
use crate::protobuf::VoteResponse as PbVoteResponse;
use crate::typ::*;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
return Err(RPCError::Unreachable(Unreachable::new(&e)));
}
};
let mut client = InternalServiceClient::new(channel);
let mut client = RaftServiceClient::new(channel);

let response = client
.append_entries(pb::AppendEntriesRequest::from(req))
Expand Down Expand Up @@ -88,7 +88,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let strm = ReceiverStream::new(rx);

let mut client = InternalServiceClient::new(channel);
let mut client = RaftServiceClient::new(channel);
let response = client.snapshot(strm).await.map_err(|e| NetworkError::new(&e))?;

// 1. Send meta chunk
Expand Down Expand Up @@ -136,7 +136,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
return Err(RPCError::Unreachable(Unreachable::new(&e)));
}
};
let mut client = InternalServiceClient::new(channel);
let mut client = RaftServiceClient::new(channel);

// Convert the openraft VoteRequest to protobuf VoteRequest
let proto_vote_req: PbVoteRequest = req.into();
Expand Down

0 comments on commit 5acd698

Please sign in to comment.