Skip to content

Commit

Permalink
Move NodeSvcHandle into core networking
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Mar 3, 2025
1 parent ed92d66 commit babed1a
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 98 deletions.
57 changes: 57 additions & 0 deletions crates/core/src/network/grpc/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::Stream;
use tokio_stream::StreamExt;

use restate_types::GenerationalNodeId;
use restate_types::config::NetworkingOptions;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::Message;
use tracing::trace;

use super::MAX_MESSAGE_SIZE;
use crate::network::net_util::create_tonic_channel;
use crate::network::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use crate::network::{NetworkError, ProtocolError, TransportConnect};

pub struct GrpcConnector {
networking_options: NetworkingOptions,
}

impl GrpcConnector {
pub fn new(networking_options: NetworkingOptions) -> Self {
Self { networking_options }
}
}

impl TransportConnect for GrpcConnector {
async fn connect(
&self,
node_id: GenerationalNodeId,
nodes_config: &NodesConfiguration,
output_stream: impl Stream<Item = Message> + Send + Unpin + 'static,
) -> Result<
impl Stream<Item = Result<Message, ProtocolError>> + Send + Unpin + 'static,
NetworkError,
> {
let address = nodes_config.find_node_by_id(node_id)?.address.clone();

trace!("Attempting to connect to node {} at {}", node_id, address);
let channel = create_tonic_channel(address, &self.networking_options);

// Establish the connection
let mut client = CoreNodeSvcClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(MAX_MESSAGE_SIZE);
let incoming = client.create_connection(output_stream).await?.into_inner();
Ok(incoming.map(|x| x.map_err(ProtocolError::from)))
}
}
19 changes: 19 additions & 0 deletions crates/core/src/network/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod connector;
mod svc_handler;

pub use connector::GrpcConnector;
pub use svc_handler::CoreNodeSvcHandler;

/// The maximum size for a grpc message for core networking service.
/// This impacts the buffer limit for prost codec.
const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
73 changes: 73 additions & 0 deletions crates/core/src/network/grpc/svc_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::stream::BoxStream;
use tokio_stream::StreamExt;
use tonic::codec::CompressionEncoding;
use tonic::{Request, Response, Status, Streaming};

use crate::network::protobuf::core_node_svc::core_node_svc_server::{
CoreNodeSvc, CoreNodeSvcServer,
};
use crate::network::{ConnectionManager, ProtocolError, TransportConnect};
use restate_types::protobuf::node::Message;

use super::MAX_MESSAGE_SIZE;

pub struct CoreNodeSvcHandler<T> {
connections: ConnectionManager<T>,
}

impl<T> CoreNodeSvcHandler<T> {
pub fn new(connections: ConnectionManager<T>) -> Self {
Self { connections }
}

pub fn into_server(self) -> CoreNodeSvcServer<Self> {
CoreNodeSvcServer::new(self)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
}
}

#[async_trait::async_trait]
impl<T> CoreNodeSvc for CoreNodeSvcHandler<T>
where
T: TransportConnect,
{
type CreateConnectionStream = BoxStream<'static, Result<Message, Status>>;

// Status codes returned in different scenarios:
// - DeadlineExceeded: No hello received within deadline
// - InvalidArgument: Header should always be set or any other missing required part of the
// handshake. This also happens if the client sent wrong message on handshake.
// - AlreadyExists: A node with a newer generation has been observed already
// - Cancelled: received an error from the client, or the client has dropped the stream during
// handshake.
// - Unavailable: This node is shutting down
async fn create_connection(
&self,
request: Request<Streaming<Message>>,
) -> Result<Response<Self::CreateConnectionStream>, Status> {
let incoming = request.into_inner();
let transformed = incoming.map(|x| x.map_err(ProtocolError::from));
let output_stream = self
.connections
.accept_incoming_connection(transformed)
.await?;

// For uniformity with outbound connections, we map all responses to Ok, we never rely on
// sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate
// errors and/or drop the stream when necessary.
Ok(Response::new(Box::pin(output_stream.map(Ok))))
}
}
4 changes: 3 additions & 1 deletion crates/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
mod connection;
mod connection_manager;
mod error;
pub mod grpc;
mod handshake;
mod message_router;
pub(crate) mod metric_definitions;
Expand All @@ -29,11 +30,12 @@ mod types;
pub use connection::{OwnedConnection, WeakConnection};
pub use connection_manager::ConnectionManager;
pub use error::*;
pub use grpc::GrpcConnector;
pub use message_router::*;
pub use network_sender::*;
pub use networking::Networking;
pub use server_builder::NetworkServerBuilder;
pub use transport_connector::{GrpcConnector, TransportConnect};
pub use transport_connector::TransportConnect;
pub use types::*;

#[cfg(any(test, feature = "test-util"))]
Expand Down
40 changes: 1 addition & 39 deletions crates/core/src/network/transport_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@

use std::future::Future;

use futures::{Stream, StreamExt};
use tracing::trace;
use futures::Stream;

use restate_types::GenerationalNodeId;
use restate_types::config::NetworkingOptions;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::Message;

use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use super::{NetworkError, ProtocolError};
use crate::network::net_util::create_tonic_channel;

pub trait TransportConnect: Send + Sync + 'static {
fn connect(
Expand All @@ -36,40 +32,6 @@ pub trait TransportConnect: Send + Sync + 'static {
> + Send;
}

pub struct GrpcConnector {
networking_options: NetworkingOptions,
}

impl GrpcConnector {
pub fn new(networking_options: NetworkingOptions) -> Self {
Self { networking_options }
}
}

impl TransportConnect for GrpcConnector {
async fn connect(
&self,
node_id: GenerationalNodeId,
nodes_config: &NodesConfiguration,
output_stream: impl Stream<Item = Message> + Send + Unpin + 'static,
) -> Result<
impl Stream<Item = Result<Message, ProtocolError>> + Send + Unpin + 'static,
NetworkError,
> {
let address = nodes_config.find_node_by_id(node_id)?.address.clone();

trace!("Attempting to connect to node {} at {}", node_id, address);
let channel = create_tonic_channel(address, &self.networking_options);

// Establish the connection
let mut client = CoreNodeSvcClient::new(channel)
.max_decoding_message_size(32 * 1024 * 1024)
.max_encoding_message_size(32 * 1024 * 1024);
let incoming = client.create_connection(output_stream).await?.into_inner();
Ok(incoming.map(|x| x.map_err(ProtocolError::from)))
}
}

#[cfg(any(test, feature = "test-util"))]
pub mod test_util {
use super::*;
Expand Down
56 changes: 5 additions & 51 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
use std::cmp::max_by_key;
use std::num::NonZeroU16;

use crate::{ClusterConfiguration, provision_cluster_metadata};
use anyhow::Context;
use bytes::BytesMut;
use enumset::EnumSet;
use futures::stream::BoxStream;
use tonic::{Request, Response, Status};
use tracing::debug;

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc;
use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
use restate_core::protobuf::node_ctl_svc::{
ClusterHealthResponse, EmbeddedMetadataClusterHealth, GetMetadataRequest, GetMetadataResponse,
Expand All @@ -33,12 +32,10 @@ use restate_types::config::Configuration;
use restate_types::logs::metadata::{NodeSetSize, ProviderConfiguration};
use restate_types::nodes_config::Role;
use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration;
use restate_types::protobuf::node::Message;
use restate_types::replication::ReplicationProperty;
use restate_types::storage::StorageCodec;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use tracing::debug;

use crate::{ClusterConfiguration, provision_cluster_metadata};

pub struct NodeCtlSvcHandler {
task_center: task_center::Handle,
Expand Down Expand Up @@ -283,46 +280,3 @@ impl NodeCtlSvc for NodeCtlSvcHandler {
Ok(Response::new(cluster_state_response))
}
}

pub struct CoreNodeSvcHandler<T> {
connections: ConnectionManager<T>,
}

impl<T> CoreNodeSvcHandler<T> {
pub fn new(connections: ConnectionManager<T>) -> Self {
Self { connections }
}
}

#[async_trait::async_trait]
impl<T> CoreNodeSvc for CoreNodeSvcHandler<T>
where
T: TransportConnect,
{
type CreateConnectionStream = BoxStream<'static, Result<Message, Status>>;

// Status codes returned in different scenarios:
// - DeadlineExceeded: No hello received within deadline
// - InvalidArgument: Header should always be set or any other missing required part of the
// handshake. This also happens if the client sent wrong message on handshake.
// - AlreadyExists: A node with a newer generation has been observed already
// - Cancelled: received an error from the client, or the client has dropped the stream during
// handshake.
// - Unavailable: This node is shutting down
async fn create_connection(
&self,
request: Request<Streaming<Message>>,
) -> Result<Response<Self::CreateConnectionStream>, Status> {
let incoming = request.into_inner();
let transformed = incoming.map(|x| x.map_err(ProtocolError::from));
let output_stream = self
.connections
.accept_incoming_connection(transformed)
.await?;

// For uniformity with outbound connections, we map all responses to Ok, we never rely on
// sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate
// errors and/or drop the stream when necessary.
Ok(Response::new(Box::pin(output_stream.map(Ok))))
}
}
10 changes: 3 additions & 7 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use tonic::codec::CompressionEncoding;
use tracing::{debug, trace};

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::grpc::CoreNodeSvcHandler;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::{TaskCenter, TaskKind, cancellation_watcher};
use restate_types::config::CommonOptions;
use restate_types::protobuf::common::NodeStatus;

use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler};
use super::grpc_svc_handler::NodeCtlSvcHandler;
use super::pprof;
use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics};
use crate::network_server::state::NodeCtrlHandlerStateBuilder;
Expand Down Expand Up @@ -112,11 +112,7 @@ impl NetworkServer {

server_builder.register_grpc_service(
TonicServiceFilter::new(
CoreNodeSvcServer::new(CoreNodeSvcHandler::new(connection_manager))
.max_decoding_message_size(32 * 1024 * 1024)
.max_encoding_message_size(32 * 1024 * 1024)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
CoreNodeSvcHandler::new(connection_manager).into_server(),
WaitForReady::new(node_health, NodeStatus::Alive),
),
restate_core::network::protobuf::core_node_svc::FILE_DESCRIPTOR_SET,
Expand Down

0 comments on commit babed1a

Please sign in to comment.