diff --git a/examples/raft-kv-memstore-grpc/Cargo.toml b/examples/raft-kv-memstore-grpc/Cargo.toml index b31a7b715..750fc58cc 100644 --- a/examples/raft-kv-memstore-grpc/Cargo.toml +++ b/examples/raft-kv-memstore-grpc/Cargo.toml @@ -34,6 +34,10 @@ dashmap = "6.1.0" prost = "0.13.4" futures = "0.3.31" +[dev-dependencies] +anyhow = "1.0.63" +maplit = "1.0.2" + [features] [build-dependencies] diff --git a/examples/raft-kv-memstore-grpc/README.md b/examples/raft-kv-memstore-grpc/README.md index d7ee2f082..93b239cc8 100644 --- a/examples/raft-kv-memstore-grpc/README.md +++ b/examples/raft-kv-memstore-grpc/README.md @@ -2,6 +2,21 @@ A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system. +This example stores Raft logs in a in-memory storage, state-machine in a +protobuf defined in-memory struct `StateMachineData`, and use gRPC for +inter-node Raft-protocol communication and the communication between app client +and the cluster. + +This example provide two testing scenarios: + +- `./tests/test_cluster.rs` brings up a 3 node cluster in a single process to + prove functions, including, initialize, add-learner, change-membership and + write/read. + +- `./test-cluster.sh` provides a more realistic scenario: it brings up a 3 + process to form a cluster, and run the similar steps: initialize, + add-learner, change-membership and write/read. + ## Modules The application is structured into key modules: @@ -9,14 +24,14 @@ The application is structured into key modules: - `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs) - `src/network`: For routing calls to their respective grpc RPCs - `src/grpc`: - - `api_service.rs`: gRPC service implementations for key value store(application APIs) - - `internal_service.rs`: Raft-specific gRPC internal network communication - - `management_service.rs`: Administrative gRPC endpoints for cluster management + - `api_service.rs`: gRPC service implementations for key value store(application APIs) and managements + - `raft_service.rs`: Raft-specific gRPC internal network communication - `protos`: Protocol buffers specifications for above services - `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs) ## Running the Cluster + ### Build the Application ```shell @@ -45,7 +60,6 @@ Start additional nodes by changing the `id` and `grpc-addr`: ## Data Storage Data is stored in state machines, with Raft ensuring data synchronization across all nodes. -See the [ExampleStateMachine](./src/store/mod.rs) for implementation details. ## Cluster Management diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs index 578869d2e..8e4736135 100644 --- a/examples/raft-kv-memstore-grpc/build.rs +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -2,7 +2,7 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=src/*"); let mut config = prost_build::Config::new(); config.protoc_arg("--experimental_allow_proto3_optional"); - let proto_files = ["proto/raft.proto", "proto/management.proto", "proto/app.proto"]; + let proto_files = ["proto/raft.proto", "proto/app_types.proto", "proto/app.proto"]; // TODO: remove serde diff --git a/examples/raft-kv-memstore-grpc/proto/app.proto b/examples/raft-kv-memstore-grpc/proto/app.proto index 1aeeb6c85..d1934cb92 100644 --- a/examples/raft-kv-memstore-grpc/proto/app.proto +++ b/examples/raft-kv-memstore-grpc/proto/app.proto @@ -1,28 +1,71 @@ syntax = "proto3"; +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; +import "raft.proto"; +import "app_types.proto"; + package openraftpb; -// SetRequest represents a key-value pair to be stored -message SetRequest { - string key = 1; // Key to store - string value = 2; // Value to associate with the key +// InitRequest contains the initial set of nodes for cluster initialization +message InitRequest { + // List of initial cluster nodes + repeated Node nodes = 1; +} + +// AddLearnerRequest specifies parameters for adding a learner node +message AddLearnerRequest { + // Node to be added as a learner + Node node = 1; } -// GetRequest represents a key lookup request -message GetRequest { - string key = 1; // Key to look up +// ChangeMembershipRequest specifies parameters for modifying cluster membership +message ChangeMembershipRequest { + // New set of voter node IDs + repeated uint64 members = 1; + // Whether to retain existing configuration + bool retain = 2; +} + +message ClientWriteResponse { + // The log id of the committed log entry. + LogId log_id = 1; + + // If the committed log entry is a normal one. + Response data = 2; + + // If the committed log entry is a change-membership entry. + Membership membership = 3; } -// GetResponse contains the value associated with the requested key -message Response { - optional string value = 1; // Retrieved value +message MetricsResponse { + // Cluster membership config + Membership membership = 1; + + // In this example, only membership is used. + // Other metrics are just encoded in string for simplicity. + // In real-world scenarios, metrics should be encoded in a more structured format. + string other_metrics = 2; } -// ApiService provides the key-value store API operations +// ApiService provides the key-value store API operations and Raft cluster management operations service AppService { // Get retrieves the value associated with a given key rpc Get(GetRequest) returns (Response) {} // Set stores a key-value pair in the distributed store rpc Set(SetRequest) returns (Response) {} -} \ No newline at end of file + + // Init initializes a new Raft cluster with the given nodes + rpc Init(InitRequest) returns (google.protobuf.Empty) {} + + // AddLearner adds a new learner node to the Raft cluster + rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {} + + // ChangeMembership modifies the cluster membership configuration + rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {} + + // Metrics retrieves cluster metrics and status information + rpc Metrics(google.protobuf.Empty) returns (MetricsResponse) {} +} + diff --git a/examples/raft-kv-memstore-grpc/proto/app_types.proto b/examples/raft-kv-memstore-grpc/proto/app_types.proto new file mode 100644 index 000000000..a8369de21 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/app_types.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package openraftpb; + +// SetRequest represents a key-value pair to be stored +message SetRequest { + string key = 1; // Key to store + string value = 2; // Value to associate with the key +} + +// GetRequest represents a key lookup request +message GetRequest { + string key = 1; // Key to look up +} + +// GetResponse contains the value associated with the requested key +message Response { + optional string value = 1; // Retrieved value +} diff --git a/examples/raft-kv-memstore-grpc/proto/management.proto b/examples/raft-kv-memstore-grpc/proto/management.proto deleted file mode 100644 index 0fee6b99d..000000000 --- a/examples/raft-kv-memstore-grpc/proto/management.proto +++ /dev/null @@ -1,55 +0,0 @@ -syntax = "proto3"; - -import "google/protobuf/empty.proto"; -import "google/protobuf/wrappers.proto"; -import "raft.proto"; -import "app.proto"; - -package openraftpb; - -// InitRequest contains the initial set of nodes for cluster initialization -message InitRequest { - // List of initial cluster nodes - repeated Node nodes = 1; -} - -// AddLearnerRequest specifies parameters for adding a learner node -message AddLearnerRequest { - // Node to be added as a learner - Node node = 1; -} - -// ChangeMembershipRequest specifies parameters for modifying cluster membership -message ChangeMembershipRequest { - // New set of voter node IDs - repeated uint64 members = 1; - // Whether to retain existing configuration - bool retain = 2; -} - -message ClientWriteResponse { - // The log id of the committed log entry. - LogId log_id = 1; - - // If the committed log entry is a normal one. - Response data = 2; - - // If the committed log entry is a change-membership entry. - Membership membership = 3; -} - -// ManagementService handles Raft cluster management operations -service ManagementService { - // Init initializes a new Raft cluster with the given nodes - rpc Init(InitRequest) returns (google.protobuf.Empty) {} - - // AddLearner adds a new learner node to the Raft cluster - rpc AddLearner(AddLearnerRequest) returns (ClientWriteResponse) {} - - // ChangeMembership modifies the cluster membership configuration - rpc ChangeMembership(ChangeMembershipRequest) returns (ClientWriteResponse) {} - - // Metrics retrieves cluster metrics and status information - rpc Metrics(google.protobuf.Empty) returns (google.protobuf.StringValue) {} -} - diff --git a/examples/raft-kv-memstore-grpc/proto/raft.proto b/examples/raft-kv-memstore-grpc/proto/raft.proto index 3cefe1276..1cd36b27a 100644 --- a/examples/raft-kv-memstore-grpc/proto/raft.proto +++ b/examples/raft-kv-memstore-grpc/proto/raft.proto @@ -1,14 +1,16 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; -import "app.proto"; +import "app_types.proto"; package openraftpb; // Node represents a single node in the Raft cluster message Node { - string rpc_addr = 1; // RPC address for node communication - uint64 node_id = 2; // Unique identifier for the node + // Unique identifier for the node + uint64 node_id = 1; + // RPC address for node communication + string rpc_addr = 2; } // LeaderId represents the leader identifier in Raft diff --git a/examples/raft-kv-memstore-grpc/src/app.rs b/examples/raft-kv-memstore-grpc/src/app.rs new file mode 100644 index 000000000..f4691a3b4 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/app.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use openraft::Config; +use tonic::transport::Server; +use tracing::info; + +use crate::grpc::app_service::AppServiceImpl; +use crate::grpc::raft_service::RaftServiceImpl; +use crate::network::Network; +use crate::pb::app_service_server::AppServiceServer; +use crate::pb::raft_service_server::RaftServiceServer; +use crate::store::LogStore; +use crate::store::StateMachineStore; +use crate::typ::*; +use crate::NodeId; + +pub async fn start_raft_app(node_id: NodeId, http_addr: String) -> Result<(), Box> { + // Create a configuration for the raft instance. + let config = Arc::new( + Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + ..Default::default() + } + .validate()?, + ); + + // Create stores and network + let log_store = LogStore::default(); + let state_machine_store = Arc::new(StateMachineStore::default()); + let network = Network {}; + + // Create Raft instance + let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?; + + // Create the management service with raft instance + let internal_service = RaftServiceImpl::new(raft.clone()); + let api_service = AppServiceImpl::new(raft, state_machine_store); + + // Start server + let server_future = Server::builder() + .add_service(RaftServiceServer::new(internal_service)) + .add_service(AppServiceServer::new(api_service)) + .serve(http_addr.parse()?); + + info!("Node {node_id} starting server at {http_addr}"); + server_future.await?; + + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/src/bin/main.rs b/examples/raft-kv-memstore-grpc/src/bin/main.rs index c514ba46a..4a01b6603 100644 --- a/examples/raft-kv-memstore-grpc/src/bin/main.rs +++ b/examples/raft-kv-memstore-grpc/src/bin/main.rs @@ -3,11 +3,9 @@ use std::sync::Arc; use clap::Parser; use openraft::Config; use raft_kv_memstore_grpc::grpc::app_service::AppServiceImpl; -use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl; use raft_kv_memstore_grpc::grpc::raft_service::RaftServiceImpl; use raft_kv_memstore_grpc::network::Network; use raft_kv_memstore_grpc::protobuf::app_service_server::AppServiceServer; -use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer; use raft_kv_memstore_grpc::protobuf::raft_service_server::RaftServiceServer; use raft_kv_memstore_grpc::typ::Raft; use raft_kv_memstore_grpc::LogStore; @@ -60,13 +58,11 @@ async fn main() -> Result<(), Box> { let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?; // Create the management service with raft instance - let management_service = ManagementServiceImpl::new(raft.clone()); let internal_service = RaftServiceImpl::new(raft.clone()); let api_service = AppServiceImpl::new(raft, state_machine_store); // Start server let server_future = Server::builder() - .add_service(ManagementServiceServer::new(management_service)) .add_service(RaftServiceServer::new(internal_service)) .add_service(AppServiceServer::new(api_service)) .serve(addr.parse()?); diff --git a/examples/raft-kv-memstore-grpc/src/grpc/app_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/app_service.rs index 6d845d01d..846fcea84 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/app_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/app_service.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::Arc; use tonic::Request; @@ -5,6 +6,7 @@ use tonic::Response; use tonic::Status; use tracing::debug; +use crate::pb; use crate::protobuf::app_service_server::AppService; use crate::protobuf::GetRequest; use crate::protobuf::Response as PbResponse; @@ -94,4 +96,112 @@ impl AppService for AppServiceImpl { debug!("Successfully retrieved value for key: {}", req.key); Ok(Response::new(PbResponse { value: Some(value) })) } + + /// Initializes a new Raft cluster with the specified nodes + /// + /// # Arguments + /// * `request` - Contains the initial set of nodes for the cluster + /// + /// # Returns + /// * Success response with initialization details + /// * Error if initialization fails + async fn init(&self, request: Request) -> Result, Status> { + debug!("Initializing Raft cluster"); + let req = request.into_inner(); + + // Convert nodes into required format + let nodes_map: BTreeMap = req + .nodes + .into_iter() + .map(|node| { + (node.node_id, pb::Node { + rpc_addr: node.rpc_addr, + node_id: node.node_id, + }) + }) + .collect(); + + // Initialize the cluster + let result = self + .raft_node + .initialize(nodes_map) + .await + .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?; + + debug!("Cluster initialization successful"); + Ok(Response::new(result)) + } + + /// Adds a learner node to the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the node information and blocking preference + /// + /// # Returns + /// * Success response with learner addition details + /// * Error if the operation fails + async fn add_learner( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let node = req.node.ok_or_else(|| Status::internal("Node information is required"))?; + + debug!("Adding learner node {}", node.node_id); + + let raft_node = Node { + rpc_addr: node.rpc_addr.clone(), + node_id: node.node_id, + }; + + let result = self + .raft_node + .add_learner(node.node_id, raft_node, true) + .await + .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?; + + debug!("Successfully added learner node {}", node.node_id); + Ok(Response::new(result.into())) + } + + /// Changes the membership of the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the new member set and retention policy + /// + /// # Returns + /// * Success response with membership change details + /// * Error if the operation fails + async fn change_membership( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + debug!( + "Changing membership. Members: {:?}, Retain: {}", + req.members, req.retain + ); + + let result = self + .raft_node + .change_membership(req.members, req.retain) + .await + .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?; + + debug!("Successfully changed cluster membership"); + Ok(Response::new(result.into())) + } + + /// Retrieves metrics about the Raft node + async fn metrics(&self, _request: Request<()>) -> Result, Status> { + debug!("Collecting metrics"); + let metrics = self.raft_node.metrics().borrow().clone(); + let resp = pb::MetricsResponse { + membership: Some(metrics.membership_config.membership().clone().into()), + other_metrics: metrics.to_string(), + }; + Ok(Response::new(resp)) + } } diff --git a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs deleted file mode 100644 index 25dcdd40e..000000000 --- a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::collections::BTreeMap; - -use tonic::Request; -use tonic::Response; -use tonic::Status; -use tracing::debug; - -use crate::pb; -use crate::protobuf::management_service_server::ManagementService; -use crate::typ::*; - -/// Management service implementation for Raft cluster administration. -/// Handles cluster initialization, membership changes, and metrics collection. -/// -/// # Responsibilities -/// - Cluster initialization -/// - Adding learner nodes -/// - Changing cluster membership -/// - Collecting metrics -pub struct ManagementServiceImpl { - raft_node: Raft, -} - -impl ManagementServiceImpl { - /// Creates a new instance of the management service - /// - /// # Arguments - /// * `raft_node` - The Raft node instance this service will manage - pub fn new(raft_node: Raft) -> Self { - ManagementServiceImpl { raft_node } - } -} - -#[tonic::async_trait] -impl ManagementService for ManagementServiceImpl { - /// Initializes a new Raft cluster with the specified nodes - /// - /// # Arguments - /// * `request` - Contains the initial set of nodes for the cluster - /// - /// # Returns - /// * Success response with initialization details - /// * Error if initialization fails - async fn init(&self, request: Request) -> Result, Status> { - debug!("Initializing Raft cluster"); - let req = request.into_inner(); - - // Convert nodes into required format - let nodes_map: BTreeMap = req - .nodes - .into_iter() - .map(|node| { - (node.node_id, pb::Node { - rpc_addr: node.rpc_addr, - node_id: node.node_id, - }) - }) - .collect(); - - // Initialize the cluster - let result = self - .raft_node - .initialize(nodes_map) - .await - .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?; - - debug!("Cluster initialization successful"); - Ok(Response::new(result)) - } - - /// Adds a learner node to the Raft cluster - /// - /// # Arguments - /// * `request` - Contains the node information and blocking preference - /// - /// # Returns - /// * Success response with learner addition details - /// * Error if the operation fails - async fn add_learner( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - - let node = req.node.ok_or_else(|| Status::internal("Node information is required"))?; - - debug!("Adding learner node {}", node.node_id); - - let raft_node = Node { - rpc_addr: node.rpc_addr.clone(), - node_id: node.node_id, - }; - - let result = self - .raft_node - .add_learner(node.node_id, raft_node, true) - .await - .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?; - - debug!("Successfully added learner node {}", node.node_id); - Ok(Response::new(result.into())) - } - - /// Changes the membership of the Raft cluster - /// - /// # Arguments - /// * `request` - Contains the new member set and retention policy - /// - /// # Returns - /// * Success response with membership change details - /// * Error if the operation fails - async fn change_membership( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - - debug!( - "Changing membership. Members: {:?}, Retain: {}", - req.members, req.retain - ); - - let result = self - .raft_node - .change_membership(req.members, req.retain) - .await - .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?; - - debug!("Successfully changed cluster membership"); - Ok(Response::new(result.into())) - } - - /// Retrieves metrics about the Raft node - async fn metrics(&self, _request: Request<()>) -> Result, Status> { - debug!("Collecting metrics"); - let metrics = self.raft_node.metrics().borrow().clone(); - let resp = metrics.to_string(); - Ok(Response::new(resp)) - } -} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/mod.rs b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs index 28edf36df..52dd0c8ea 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs @@ -1,3 +1,2 @@ pub mod app_service; -pub mod management_service; pub mod raft_service; diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index e160f524b..2ec55d210 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::uninlined_format_args)] +pub mod app; pub mod grpc; pub mod network; pub mod store; diff --git a/examples/raft-kv-memstore-grpc/tests/test_cluster.rs b/examples/raft-kv-memstore-grpc/tests/test_cluster.rs new file mode 100644 index 000000000..5da539143 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/tests/test_cluster.rs @@ -0,0 +1,230 @@ +use std::backtrace::Backtrace; +use std::panic::PanicHookInfo; +use std::thread; +use std::time::Duration; + +use maplit::btreemap; +use raft_kv_memstore_grpc::app::start_raft_app; +use raft_kv_memstore_grpc::protobuf as pb; +use raft_kv_memstore_grpc::protobuf::app_service_client::AppServiceClient; +use tokio::runtime::Runtime; +use tonic::transport::Channel; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicHookInfo) { + let backtrace = { format!("{:?}", Backtrace::force_capture()) }; + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// Set up a cluster of 3 nodes. +/// Write to it and read from it. +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn test_cluster() -> anyhow::Result<()> { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + // --- Start 3 raft node in 3 threads. + + let _h1 = thread::spawn(|| { + let rt = Runtime::new().unwrap(); + let x = rt.block_on(start_raft_app(1, get_addr(1))); + println!("raft app exit result: {:?}", x); + }); + + let _h2 = thread::spawn(|| { + let rt = Runtime::new().unwrap(); + let x = rt.block_on(start_raft_app(2, get_addr(2))); + println!("raft app exit result: {:?}", x); + }); + + let _h3 = thread::spawn(|| { + let rt = Runtime::new().unwrap(); + let x = rt.block_on(start_raft_app(3, get_addr(3))); + println!("raft app exit result: {:?}", x); + }); + + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + let mut client1 = new_client(get_addr(1)).await?; + + // --- Initialize the target node as a cluster of only one node. + // After init(), the single node cluster will be fully functional. + println!("=== init single node cluster"); + { + client1 + .init(pb::InitRequest { + nodes: vec![new_node(1)], + }) + .await?; + + let metrics = client1.metrics(()).await?.into_inner(); + println!("=== metrics after init: {:?}", metrics); + } + + println!( + "=== Add node 2, 3 to the cluster as learners, to let them start to receive log replication from the leader" + ); + { + println!("=== add-learner 2"); + client1 + .add_learner(pb::AddLearnerRequest { + node: Some(new_node(2)), + }) + .await?; + + println!("=== add-learner 3"); + client1 + .add_learner(pb::AddLearnerRequest { + node: Some(new_node(3)), + }) + .await?; + + let metrics = client1.metrics(()).await?.into_inner(); + println!("=== metrics after add-learner: {:?}", metrics); + assert_eq!( + vec![pb::NodeIdSet { + node_ids: btreemap! { 1 => ()} + }], + metrics.membership.clone().unwrap().configs + ); + assert_eq!( + btreemap! { + 1=>new_node(1), + 2=>new_node(2), + 3=>new_node(3), + }, + metrics.membership.unwrap().nodes + ); + } + + // --- Turn the two learners to members. + // A member node can vote or elect itself as leader. + + println!("=== change-membership to 1,2,3"); + { + client1 + .change_membership(pb::ChangeMembershipRequest { + members: vec![1, 2, 3], + retain: false, + }) + .await?; + + let metrics = client1.metrics(()).await?.into_inner(); + println!("=== metrics after change-member: {:?}", metrics); + assert_eq!( + vec![pb::NodeIdSet { + node_ids: btreemap! { 1=>(),2=>(),3=>()} + }], + metrics.membership.unwrap().configs + ); + } + + println!("=== write `foo=bar`"); + { + client1 + .set(pb::SetRequest { + key: "foo".to_string(), + value: "bar".to_string(), + }) + .await?; + + // --- Wait for a while to let the replication get done. + tokio::time::sleep(Duration::from_millis(1_000)).await; + } + + println!("=== read `foo` on every node"); + { + println!("=== read `foo` on node 1"); + { + let got = client1.get(pb::GetRequest { key: "foo".to_string() }).await?; + assert_eq!(Some("bar".to_string()), got.into_inner().value); + } + + println!("=== read `foo` on node 2"); + { + let mut client2 = new_client(get_addr(2)).await?; + let got = client2.get(pb::GetRequest { key: "foo".to_string() }).await?; + assert_eq!(Some("bar".to_string()), got.into_inner().value); + } + + println!("=== read `foo` on node 3"); + { + let mut client3 = new_client(get_addr(3)).await?; + let got = client3.get(pb::GetRequest { key: "foo".to_string() }).await?; + assert_eq!(Some("bar".to_string()), got.into_inner().value); + } + } + + println!("=== Remove node 1,2 by change-membership to {{3}}"); + { + client1 + .change_membership(pb::ChangeMembershipRequest { + members: vec![3], + retain: false, + }) + .await?; + + tokio::time::sleep(Duration::from_millis(2_000)).await; + + let metrics = client1.metrics(()).await?.into_inner(); + println!("=== metrics after change-membership to {{3}}: {:?}", metrics); + assert_eq!( + vec![pb::NodeIdSet { + node_ids: btreemap! { 3=>() } + }], + metrics.membership.unwrap().configs + ); + } + + Ok(()) +} + +async fn new_client(addr: String) -> Result, tonic::transport::Error> { + let channel = Channel::builder(format!("https://{}", addr).parse().unwrap()).connect().await?; + let client = AppServiceClient::new(channel); + Ok(client) +} + +fn new_node(node_id: u64) -> pb::Node { + pb::Node { + node_id, + rpc_addr: get_addr(node_id), + } +} + +fn get_addr(node_id: u64) -> String { + match node_id { + 1 => "127.0.0.1:21001".to_string(), + 2 => "127.0.0.1:21002".to_string(), + 3 => "127.0.0.1:21003".to_string(), + _ => { + unreachable!("node_id must be 1, 2, or 3"); + } + } +}