Skip to content

Commit

Permalink
Doc: Finish gRPC example raft-kv-memstore-grpc
Browse files Browse the repository at this point in the history
A rust test `tests/test_cluster.rs` brings up a 3 nodes cluster and
executes write and read on it.

- Fix: databendlabs#1287
  • Loading branch information
drmingdrmer committed Feb 4, 2025
1 parent ce83ecc commit c64caaf
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 220 deletions.
4 changes: 4 additions & 0 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ dashmap = "6.1.0"
prost = "0.13.4"
futures = "0.3.31"

[dev-dependencies]
anyhow = "1.0.63"
maplit = "1.0.2"

[features]

[build-dependencies]
Expand Down
22 changes: 18 additions & 4 deletions examples/raft-kv-memstore-grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@

A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system.

This example stores Raft logs in a in-memory storage, state-machine in a
protobuf defined in-memory struct `StateMachineData`, and use gRPC for
inter-node Raft-protocol communication and the communication between app client
and the cluster.

This example provide two testing scenarios:

- `./tests/test_cluster.rs` brings up a 3 node cluster in a single process to
prove functions, including, initialize, add-learner, change-membership and
write/read.

- `./test-cluster.sh` provides a more realistic scenario: it brings up a 3
process to form a cluster, and run the similar steps: initialize,
add-learner, change-membership and write/read.

## Modules

The application is structured into key modules:

- `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs)
- `src/network`: For routing calls to their respective grpc RPCs
- `src/grpc`:
- `api_service.rs`: gRPC service implementations for key value store(application APIs)
- `internal_service.rs`: Raft-specific gRPC internal network communication
- `management_service.rs`: Administrative gRPC endpoints for cluster management
- `api_service.rs`: gRPC service implementations for key value store(application APIs) and managements
- `raft_service.rs`: Raft-specific gRPC internal network communication
- `protos`: Protocol buffers specifications for above services
- `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs)

## Running the Cluster


### Build the Application

```shell
Expand Down Expand Up @@ -45,7 +60,6 @@ Start additional nodes by changing the `id` and `grpc-addr`:
## Data Storage

Data is stored in state machines, with Raft ensuring data synchronization across all nodes.
See the [ExampleStateMachine](./src/store/mod.rs) for implementation details.

## Cluster Management

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
let mut config = prost_build::Config::new();
config.protoc_arg("--experimental_allow_proto3_optional");
let proto_files = ["proto/raft.proto", "proto/management.proto", "proto/app.proto"];
let proto_files = ["proto/raft.proto", "proto/app_types.proto", "proto/app.proto"];

// TODO: remove serde

Expand Down
67 changes: 55 additions & 12 deletions examples/raft-kv-memstore-grpc/proto/app.proto
Original file line number Diff line number Diff line change
@@ -1,28 +1,71 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "raft.proto";
import "app_types.proto";

package openraftpb;

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
// List of initial cluster nodes
repeated Node nodes = 1;
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
// Node to be added as a learner
Node node = 1;
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
// New set of voter node IDs
repeated uint64 members = 1;
// Whether to retain existing configuration
bool retain = 2;
}

message ClientWriteResponse {
// The log id of the committed log entry.
LogId log_id = 1;

// If the committed log entry is a normal one.
Response data = 2;

// If the committed log entry is a change-membership entry.
Membership membership = 3;
}

// GetResponse contains the value associated with the requested key
message Response {
optional string value = 1; // Retrieved value
message MetricsResponse {
// Cluster membership config
Membership membership = 1;

// In this example, only membership is used.
// Other metrics are just encoded in string for simplicity.
// In real-world scenarios, metrics should be encoded in a more structured format.
string other_metrics = 2;
}

// ApiService provides the key-value store API operations
// ApiService provides the key-value store API operations and Raft cluster management operations
service AppService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (Response) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (Response) {}
}

// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (google.protobuf.Empty) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(google.protobuf.Empty) returns (MetricsResponse) {}
}

19 changes: 19 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/app_types.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

package openraftpb;

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
}

// GetResponse contains the value associated with the requested key
message Response {
optional string value = 1; // Retrieved value
}
55 changes: 0 additions & 55 deletions examples/raft-kv-memstore-grpc/proto/management.proto

This file was deleted.

8 changes: 5 additions & 3 deletions examples/raft-kv-memstore-grpc/proto/raft.proto
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "app.proto";
import "app_types.proto";

package openraftpb;

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
// Unique identifier for the node
uint64 node_id = 1;
// RPC address for node communication
string rpc_addr = 2;
}

// LeaderId represents the leader identifier in Raft
Expand Down
51 changes: 51 additions & 0 deletions examples/raft-kv-memstore-grpc/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::sync::Arc;

use openraft::Config;
use tonic::transport::Server;
use tracing::info;

use crate::grpc::app_service::AppServiceImpl;
use crate::grpc::raft_service::RaftServiceImpl;
use crate::network::Network;
use crate::pb::app_service_server::AppServiceServer;
use crate::pb::raft_service_server::RaftServiceServer;
use crate::store::LogStore;
use crate::store::StateMachineStore;
use crate::typ::*;
use crate::NodeId;

pub async fn start_raft_app(node_id: NodeId, http_addr: String) -> Result<(), Box<dyn std::error::Error>> {
// Create a configuration for the raft instance.
let config = Arc::new(
Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
}
.validate()?,
);

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Create the management service with raft instance
let internal_service = RaftServiceImpl::new(raft.clone());
let api_service = AppServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
.add_service(RaftServiceServer::new(internal_service))
.add_service(AppServiceServer::new(api_service))
.serve(http_addr.parse()?);

info!("Node {node_id} starting server at {http_addr}");
server_future.await?;

Ok(())
}
4 changes: 0 additions & 4 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::sync::Arc;
use clap::Parser;
use openraft::Config;
use raft_kv_memstore_grpc::grpc::app_service::AppServiceImpl;
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl;
use raft_kv_memstore_grpc::grpc::raft_service::RaftServiceImpl;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::app_service_server::AppServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::protobuf::raft_service_server::RaftServiceServer;
use raft_kv_memstore_grpc::typ::Raft;
use raft_kv_memstore_grpc::LogStore;
Expand Down Expand Up @@ -60,13 +58,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Create the management service with raft instance
let management_service = ManagementServiceImpl::new(raft.clone());
let internal_service = RaftServiceImpl::new(raft.clone());
let api_service = AppServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
.add_service(ManagementServiceServer::new(management_service))
.add_service(RaftServiceServer::new(internal_service))
.add_service(AppServiceServer::new(api_service))
.serve(addr.parse()?);
Expand Down
Loading

0 comments on commit c64caaf

Please sign in to comment.