From f5f7c24e3fa2b2a779b4c858b65ec1313be67353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 18 Jun 2024 19:10:08 +0200 Subject: [PATCH 01/22] feat: provider rpc client wip --- Cargo.lock | 2 + Cargo.toml | 1 + cli/polka-storage-provider/Cargo.toml | 3 +- cli/polka-storage-provider/src/cli.rs | 7 +- .../src/commands/info.rs | 40 ++++- .../src/commands/run.rs | 3 +- .../src/commands/runner.rs | 15 +- cli/polka-storage-provider/src/rpc.rs | 12 ++ cli/polka-storage-provider/src/rpc/client.rs | 140 ++++++++++++++++++ primitives/cli/Cargo.toml | 1 + primitives/cli/src/error.rs | 3 + 11 files changed, 218 insertions(+), 9 deletions(-) create mode 100644 cli/polka-storage-provider/src/rpc/client.rs diff --git a/Cargo.lock b/Cargo.lock index 6efd49b3a..15a5e8332 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,6 +1547,7 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" name = "cli-primitives" version = "0.1.0" dependencies = [ + "jsonrpsee", "sc-cli", "subxt", "thiserror", @@ -8150,6 +8151,7 @@ dependencies = [ name = "polka-storage-provider" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "clap", "cli-primitives", diff --git a/Cargo.toml b/Cargo.toml index 7b8bffbf4..5881e8f85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ substrate-build-script-utils = { git = "https://github.com/paritytech/polkadot-s substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } async-stream = "0.3.5" +async-trait = "0.1.80" bitflags = "2.5.0" byteorder = "1.5.0" bytes = "1.6.0" diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index ca5ccf413..63849307e 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -8,10 +8,11 @@ repository.workspace = true version = "0.1.0" [dependencies] +async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } cli-primitives = { workspace = true } -jsonrpsee = { workspace = true, features = ["server"] } +jsonrpsee = { workspace = true, features = ["http-client", "server", "ws-client"] } sc-cli = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index f2d6dd5fd..5882ef553 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -1,6 +1,9 @@ use clap::Parser; -use crate::commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}; +use crate::{ + commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, + rpc::SERVER_DEFAULT_BIND_ADDR, +}; /// A CLI application that facilitates management operations over a running full /// node and other components. @@ -9,6 +12,8 @@ use crate::commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}; pub(crate) struct Cli { #[command(subcommand)] pub subcommand: SubCommand, + #[arg(short, long, default_value = SERVER_DEFAULT_BIND_ADDR)] + pub rpc_server_address: String, } /// Supported sub-commands. diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index aa6f9f2bd..67c88e85b 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -1,14 +1,48 @@ +use std::fmt::{self, Display, Formatter}; + +use chrono::{DateTime, Local, Utc}; use clap::Parser; use cli_primitives::Error; +use crate::rpc::{methods::common::Info, RpcClient, RpcMethod}; + /// Command to display information about the storage provider. #[derive(Debug, Clone, Parser)] pub(crate) struct InfoCommand; impl InfoCommand { - pub async fn run(&self) -> Result<(), Error> { - // TODO(#66,@cernicc,31/05/2024): Print start time of the provider + pub async fn run(&self, client: &RpcClient) -> Result<(), Error> { // TODO(#67,@cernicc,07/06/2024): Print polkadot address used by the provider - unimplemented!() + + // Get server info + let server_info = Info::call(client, None).await?; + + let node_status_info = NodeStatusInfo { + start_time: server_info.start_time, + }; + + println!("{}", node_status_info); + + Ok(()) + } +} + +struct NodeStatusInfo { + start_time: DateTime, +} + +impl Display for NodeStatusInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let uptime = { + let now = Utc::now(); + let uptime = now - self.start_time; + + format!( + "Uptime: {uptime} (Started at: {})", + self.start_time.with_timezone(&Local) + ) + }; + + writeln!(f, "{uptime}") } } diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index d7c92406f..1df8c158b 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -7,11 +7,10 @@ use tracing::info; use url::Url; use crate::{ - rpc::{start_rpc, RpcServerState}, + rpc::{start_rpc, RpcServerState, SERVER_DEFAULT_BIND_ADDR}, substrate, }; -const SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; /// Command to start the storage provider. diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index a2fa88013..ab71e1d3c 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -1,8 +1,15 @@ +use std::str::FromStr; + use clap::Parser; use cli_primitives::Error; +use url::Url; use super::WalletCommand; -use crate::{cli::SubCommand, Cli}; +use crate::{ + cli::SubCommand, + rpc::{RpcClient, SERVER_DEFAULT_BIND_ADDR}, + Cli, +}; /// Parses command line arguments into the service configuration and runs the specified /// command with it. @@ -10,10 +17,14 @@ pub(crate) async fn run() -> Result<(), Error> { // CLI arguments parsed and mapped to the struct. let cli_arguments: Cli = Cli::parse(); + // Rpc client used to interact with the node + let rpc_url = Url::from_str(&format!("http://{SERVER_DEFAULT_BIND_ADDR}")).unwrap(); + let rpc_client = RpcClient::new(rpc_url); + match &cli_arguments.subcommand { SubCommand::Init(cmd) => cmd.run().await, SubCommand::Run(cmd) => cmd.run().await, - SubCommand::Info(cmd) => cmd.run().await, + SubCommand::Info(cmd) => cmd.run(&rpc_client).await, SubCommand::Wallet(cmd) => match cmd { WalletCommand::GenerateNodeKey(cmd) => Ok(cmd.run()?), WalletCommand::Generate(cmd) => Ok(cmd.run()?), diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 356a8b39d..c6a6fcdb1 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -4,6 +4,7 @@ use chrono::Utc; use cli_primitives::Error; use error::ServerError; use jsonrpsee::{ + core::ClientError, server::{Server, ServerHandle}, types::Params, RpcModule, @@ -13,9 +14,15 @@ use serde::{Deserialize, Serialize}; use crate::substrate; +mod client; pub mod error; pub mod methods; +pub use client::RpcClient; + +/// Default address to bind the RPC server to. +pub const SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; + /// A definition of an RPC method handler which can be registered with an [`RpcModule`]. pub trait RpcMethod { /// Method name. @@ -43,6 +50,11 @@ pub trait RpcMethod { }) .expect("method should be valid") // This is safe because we know the method registered is valid. } + + /// Call the rpc method with the provided client and params. + async fn call(client: &RpcClient, params: Option>) -> Result { + todo!() + } } /// Available API versions. diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs new file mode 100644 index 000000000..d9aca12c7 --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -0,0 +1,140 @@ +use std::{fmt, fmt::Debug}; + +use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT, Subscription, SubscriptionClientT}, + params::BatchRequestBuilder, + traits::ToRpcParams, + ClientError, + }, + http_client::HttpClientBuilder, + ws_client::WsClientBuilder, +}; +use serde::de::DeserializeOwned; +use tokio::sync::OnceCell; +use url::Url; + +pub struct RpcClient { + base_url: Url, + v0: OnceCell, +} + +impl RpcClient { + pub fn new(base_url: Url) -> Self { + Self { + base_url, + v0: OnceCell::new(), + } + } +} + +/// Represents a single connection to the URL server +struct InnerClient { + url: Url, + specific: ClientSpecific, +} + +impl Debug for InnerClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InnerClient") + .field("url", &self.url) + .finish_non_exhaustive() + } +} + +impl InnerClient { + async fn new(url: Url) -> Result { + let specific = match url.scheme() { + "ws" | "wss" => ClientSpecific::Ws(WsClientBuilder::new().build(&url).await?), + "http" | "https" => ClientSpecific::Https(HttpClientBuilder::new().build(&url)?), + it => { + return Err(ClientError::Custom(format!( + "Unsupported URL scheme: {}", + it + ))) + } + }; + + Ok(Self { url, specific }) + } +} + +enum ClientSpecific { + Ws(jsonrpsee::ws_client::WsClient), + Https(jsonrpsee::http_client::HttpClient), +} + +#[async_trait::async_trait] +impl ClientT for InnerClient { + async fn notification(&self, method: &str, params: Params) -> Result<(), ClientError> + where + Params: ToRpcParams + Send, + { + match &self.specific { + ClientSpecific::Ws(client) => client.notification(method, params).await, + ClientSpecific::Https(client) => client.notification(method, params).await, + } + } + + async fn request(&self, method: &str, params: Params) -> Result + where + R: DeserializeOwned, + Params: ToRpcParams + Send, + { + match &self.specific { + ClientSpecific::Ws(client) => client.request(method, params).await, + ClientSpecific::Https(client) => client.request(method, params).await, + } + } + + async fn batch_request<'a, R>( + &self, + batch: BatchRequestBuilder<'a>, + ) -> Result, ClientError> + where + R: DeserializeOwned + fmt::Debug + 'a, + { + match &self.specific { + ClientSpecific::Ws(client) => client.batch_request(batch).await, + ClientSpecific::Https(client) => client.batch_request(batch).await, + } + } +} + +#[async_trait::async_trait] +impl SubscriptionClientT for InnerClient { + async fn subscribe<'a, Notif, Params>( + &self, + subscribe_method: &'a str, + params: Params, + unsubscribe_method: &'a str, + ) -> Result, ClientError> + where + Params: ToRpcParams + Send, + Notif: DeserializeOwned, + { + match &self.specific { + ClientSpecific::Ws(it) => { + it.subscribe(subscribe_method, params, unsubscribe_method) + .await + } + ClientSpecific::Https(it) => { + it.subscribe(subscribe_method, params, unsubscribe_method) + .await + } + } + } + + async fn subscribe_to_method<'a, Notif>( + &self, + method: &'a str, + ) -> Result, ClientError> + where + Notif: DeserializeOwned, + { + match &self.specific { + ClientSpecific::Ws(it) => it.subscribe_to_method(method).await, + ClientSpecific::Https(it) => it.subscribe_to_method(method).await, + } + } +} diff --git a/primitives/cli/Cargo.toml b/primitives/cli/Cargo.toml index 0bedf6d52..17898464b 100644 --- a/primitives/cli/Cargo.toml +++ b/primitives/cli/Cargo.toml @@ -8,6 +8,7 @@ repository.workspace = true version = "0.1.0" [dependencies] +jsonrpsee = { workspace = true } sc-cli = { workspace = true } subxt = { workspace = true } thiserror = { workspace = true } diff --git a/primitives/cli/src/error.rs b/primitives/cli/src/error.rs index 738d1f4e6..31f92859a 100644 --- a/primitives/cli/src/error.rs +++ b/primitives/cli/src/error.rs @@ -14,4 +14,7 @@ pub enum Error { #[error(transparent)] SubstrateCli(#[from] sc_cli::Error), + + #[error("JSON-RPC client error: {0}")] + JsonRpcClient(#[from] jsonrpsee::core::ClientError), } From 2cca5316340c6f5148444a27485ceb5c683c7aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 18 Jun 2024 19:20:24 +0200 Subject: [PATCH 02/22] fix: rename constant --- cli/polka-storage-provider/src/cli.rs | 4 ++-- cli/polka-storage-provider/src/commands/run.rs | 4 ++-- cli/polka-storage-provider/src/commands/runner.rs | 4 ++-- cli/polka-storage-provider/src/rpc.rs | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index 5882ef553..cf350439b 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -2,7 +2,7 @@ use clap::Parser; use crate::{ commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, - rpc::SERVER_DEFAULT_BIND_ADDR, + rpc::RPC_SERVER_DEFAULT_BIND_ADDR, }; /// A CLI application that facilitates management operations over a running full @@ -12,7 +12,7 @@ use crate::{ pub(crate) struct Cli { #[command(subcommand)] pub subcommand: SubCommand, - #[arg(short, long, default_value = SERVER_DEFAULT_BIND_ADDR)] + #[arg(short, long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] pub rpc_server_address: String, } diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index 1df8c158b..b109f6bef 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -7,7 +7,7 @@ use tracing::info; use url::Url; use crate::{ - rpc::{start_rpc, RpcServerState, SERVER_DEFAULT_BIND_ADDR}, + rpc::{start_rpc, RpcServerState, RPC_SERVER_DEFAULT_BIND_ADDR}, substrate, }; @@ -20,7 +20,7 @@ pub(crate) struct RunCommand { #[arg(short = 'n', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] pub node_rpc_address: Url, /// Address used for RPC. By default binds on localhost on port 8000. - #[arg(short = 'a', long, default_value = SERVER_DEFAULT_BIND_ADDR)] + #[arg(short = 'a', long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] pub listen_addr: SocketAddr, } diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index ab71e1d3c..b54d7dd76 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -7,7 +7,7 @@ use url::Url; use super::WalletCommand; use crate::{ cli::SubCommand, - rpc::{RpcClient, SERVER_DEFAULT_BIND_ADDR}, + rpc::{RpcClient, RPC_SERVER_DEFAULT_BIND_ADDR}, Cli, }; @@ -18,7 +18,7 @@ pub(crate) async fn run() -> Result<(), Error> { let cli_arguments: Cli = Cli::parse(); // Rpc client used to interact with the node - let rpc_url = Url::from_str(&format!("http://{SERVER_DEFAULT_BIND_ADDR}")).unwrap(); + let rpc_url = Url::from_str(&format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}")).unwrap(); let rpc_client = RpcClient::new(rpc_url); match &cli_arguments.subcommand { diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index c6a6fcdb1..34e30f930 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -21,7 +21,7 @@ pub mod methods; pub use client::RpcClient; /// Default address to bind the RPC server to. -pub const SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; +pub const RPC_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; /// A definition of an RPC method handler which can be registered with an [`RpcModule`]. pub trait RpcMethod { From 52a6de8cf414b4a834d2cce109020ee5e1939ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Wed, 19 Jun 2024 17:24:03 +0200 Subject: [PATCH 03/22] feat: add method specific params --- .../src/commands/info.rs | 9 +- cli/polka-storage-provider/src/rpc.rs | 45 +++++++-- cli/polka-storage-provider/src/rpc/client.rs | 92 ++++++++++++++++++- cli/polka-storage-provider/src/rpc/error.rs | 9 +- cli/polka-storage-provider/src/rpc/methods.rs | 2 +- .../src/rpc/methods/common.rs | 9 +- .../src/rpc/methods/wallet.rs | 8 +- 7 files changed, 142 insertions(+), 32 deletions(-) diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index 67c88e85b..c5d532fe5 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Local, Utc}; use clap::Parser; use cli_primitives::Error; -use crate::rpc::{methods::common::Info, RpcClient, RpcMethod}; +use crate::rpc::{methods::common::Info, RpcClient, RpcMethodExt}; /// Command to display information about the storage provider. #[derive(Debug, Clone, Parser)] @@ -15,7 +15,7 @@ impl InfoCommand { // TODO(#67,@cernicc,07/06/2024): Print polkadot address used by the provider // Get server info - let server_info = Info::call(client, None).await?; + let server_info = Info::call(client, ()).await?; let node_status_info = NodeStatusInfo { start_time: server_info.start_time, @@ -37,10 +37,7 @@ impl Display for NodeStatusInfo { let now = Utc::now(); let uptime = now - self.start_time; - format!( - "Uptime: {uptime} (Started at: {})", - self.start_time.with_timezone(&Local) - ) + format!("Uptime: {uptime} (Started at: {})", self.start_time) }; writeln!(f, "{uptime}") diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 34e30f930..ec3dceb92 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -1,16 +1,16 @@ -use std::{future::Future, net::SocketAddr, sync::Arc}; +use std::{fmt::Debug, future::Future, net::SocketAddr, sync::Arc}; use chrono::Utc; use cli_primitives::Error; +use client::Request; use error::ServerError; use jsonrpsee::{ core::ClientError, server::{Server, ServerHandle}, - types::Params, RpcModule, }; use methods::create_module; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::substrate; @@ -30,14 +30,18 @@ pub trait RpcMethod { /// See [`ApiVersion`]. const API_VERSION: ApiVersion; /// Successful response type. - type Ok: Serialize; + type Ok: Debug + Serialize + DeserializeOwned; + /// Parameters type. + type Params: Debug + Serialize + DeserializeOwned; /// Logic for this method. fn handle( ctx: Arc, - params: Params, + params: Self::Params, ) -> impl Future> + Send; +} +pub trait RpcMethodExt: RpcMethod { /// Register this method with an [`RpcModule`]. fn register_async(module: &mut RpcModule) -> &mut jsonrpsee::MethodCallback where @@ -45,18 +49,43 @@ pub trait RpcMethod { { module .register_async_method(Self::NAME, move |params, ctx| async move { + // Try to deserialize the params + let params = params.parse().map_err(|e| { + tracing::error!("Failed to parse params: {:?}", e); + ServerError::invalid_params("Failed to parse params", None) + })?; + + // Handle the method let ok = Self::handle(ctx, params).await?; Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok) }) .expect("method should be valid") // This is safe because we know the method registered is valid. } - /// Call the rpc method with the provided client and params. - async fn call(client: &RpcClient, params: Option>) -> Result { - todo!() + /// Create a request for this method. + /// + /// Returns [`Err`] if any of the parameters fail to serialize. + fn request(params: Self::Params) -> Result, serde_json::Error> { + let params = serde_json::to_value(params).expect("params should serialize"); + + Ok(Request { + method_name: Self::NAME, + params, + result_type: std::marker::PhantomData, + api_version: Self::API_VERSION, + }) + } + + /// Call the method with the provided client and params. + async fn call(client: &RpcClient, params: Self::Params) -> Result { + let response = client.call(Self::request(params)?).await?; + Ok(response) } } +/// Blanket implementation for all types that implement [`RpcMethod`]. +impl RpcMethodExt for T where T: RpcMethod {} + /// Available API versions. /// /// These are significant because they are expressed in the URL path against diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index d9aca12c7..272f00c09 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -1,9 +1,12 @@ -use std::{fmt, fmt::Debug}; +use std::{ + fmt::{self, Debug}, + marker::PhantomData, +}; use jsonrpsee::{ core::{ client::{BatchResponse, ClientT, Subscription, SubscriptionClientT}, - params::BatchRequestBuilder, + params::{ArrayParams, BatchRequestBuilder, ObjectParams}, traits::ToRpcParams, ClientError, }, @@ -12,11 +15,14 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use tokio::sync::OnceCell; +use tracing::{debug, Instrument}; use url::Url; +use super::ApiVersion; + pub struct RpcClient { base_url: Url, - v0: OnceCell, + v0: OnceCell, } impl RpcClient { @@ -26,6 +32,72 @@ impl RpcClient { v0: OnceCell::new(), } } + + pub async fn call( + &self, + req: Request, + ) -> Result { + let Request { + method_name, + params, + api_version, + .. + } = req; + + let client = self.get_or_init_client(api_version).await?; + let span = tracing::debug_span!("request", method = %method_name, url = %client.url); + + let work = async { + let result = match params { + serde_json::Value::Null => client.request(method_name, ArrayParams::new()), + serde_json::Value::Array(it) => { + let mut params = ArrayParams::new(); + for param in it { + params.insert(param)? + } + client.request(method_name, params) + } + serde_json::Value::Object(it) => { + let mut params = ObjectParams::new(); + for (name, param) in it { + params.insert(&name, param)? + } + client.request(method_name, params) + } + prim @ (serde_json::Value::Bool(_) + | serde_json::Value::Number(_) + | serde_json::Value::String(_)) => { + return Err(ClientError::Custom(format!( + "invalid parameter type: `{}`", + prim + ))) + } + } + .await; + debug!(?result); + + result + }; + + work.instrument(span.or_current()).await + } + + async fn get_or_init_client(&self, version: ApiVersion) -> Result<&InnerClient, ClientError> { + match version { + ApiVersion::V0 => &self.v0, + } + .get_or_try_init(|| async { + let version_part = match version { + ApiVersion::V0 => "rpc/v0", + }; + + let url = self.base_url.join(version_part).map_err(|it| { + ClientError::Custom(format!("creating url for endpoint failed: {}", it)) + })?; + InnerClient::new(url).await + }) + .await + } } /// Represents a single connection to the URL server @@ -138,3 +210,17 @@ impl SubscriptionClientT for InnerClient { } } } + +#[derive(Debug)] +pub struct Request { + pub method_name: &'static str, + pub params: serde_json::Value, + pub result_type: PhantomData, + pub api_version: ApiVersion, +} + +impl ToRpcParams for Request { + fn to_rpc_params(self) -> Result>, serde_json::Error> { + Ok(Some(serde_json::value::to_raw_value(&self.params)?)) + } +} diff --git a/cli/polka-storage-provider/src/rpc/error.rs b/cli/polka-storage-provider/src/rpc/error.rs index c90aa5e0b..af5578605 100644 --- a/cli/polka-storage-provider/src/rpc/error.rs +++ b/cli/polka-storage-provider/src/rpc/error.rs @@ -1,6 +1,9 @@ use std::fmt::Display; -use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObjectOwned}; +use jsonrpsee::types::{ + error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, + ErrorObjectOwned, +}; use serde_json::Value; /// Error type for RPC server errors. @@ -33,6 +36,10 @@ impl ServerError { pub fn internal_error(message: impl Display, data: impl Into>) -> Self { Self::new(INTERNAL_ERROR_CODE, message, data) } + + pub fn invalid_params(message: impl Display, data: impl Into>) -> Self { + Self::new(INVALID_PARAMS_CODE, message, data) + } } impl From for ServerError { diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index e62f231b1..ea11dd567 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use jsonrpsee::RpcModule; -use super::{RpcMethod, RpcServerState}; +use super::{RpcMethodExt, RpcServerState}; pub mod common; pub mod wallet; diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index 6232af8cf..3279ca158 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; -use jsonrpsee::types::Params; use serde::{Deserialize, Serialize}; use crate::rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}; @@ -12,15 +11,11 @@ pub struct Info; impl RpcMethod for Info { const NAME: &'static str = "info"; - const API_VERSION: ApiVersion = ApiVersion::V0; - type Ok = InfoResult; + type Params = (); - async fn handle( - ctx: Arc, - _params: Params<'_>, - ) -> Result { + async fn handle(ctx: Arc, _: Self::Params) -> Result { Ok(InfoResult { start_time: ctx.start_time, }) diff --git a/cli/polka-storage-provider/src/rpc/methods/wallet.rs b/cli/polka-storage-provider/src/rpc/methods/wallet.rs index e5d93fd2f..d8c5bef53 100644 --- a/cli/polka-storage-provider/src/rpc/methods/wallet.rs +++ b/cli/polka-storage-provider/src/rpc/methods/wallet.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use jsonrpsee::types::Params; use serde::{Deserialize, Serialize}; use subxt_signer::sr25519::dev; @@ -15,13 +14,10 @@ pub struct WalletBalance; impl RpcMethod for WalletBalance { const NAME: &'static str = "wallet_balance"; const API_VERSION: ApiVersion = ApiVersion::V0; - type Ok = Option; + type Params = (); - async fn handle( - ctx: Arc, - _params: Params<'_>, - ) -> Result { + async fn handle(ctx: Arc, _: Self::Params) -> Result { // TODO(#68,@cernicc,05/06/2024): Implement correctly. dev alice is used as a show case for now. let account = dev::alice().public_key().into(); let balance = get_system_balances(&ctx.substrate_client, &account).await?; From a20e65d5919c8a0e2110ad9042dec8110c466af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Wed, 19 Jun 2024 18:50:51 +0200 Subject: [PATCH 04/22] fix: remove unused dep --- cli/polka-storage-provider/src/commands/info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index c5d532fe5..6199c3258 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Display, Formatter}; -use chrono::{DateTime, Local, Utc}; +use chrono::{DateTime, Utc}; use clap::Parser; use cli_primitives::Error; From 10055681a33e635e61d830180d7edcabd38159d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 08:09:19 +0200 Subject: [PATCH 05/22] fix: taplo --- Cargo.toml | 15 +++------------ cli/polka-storage-provider/Cargo.toml | 6 +----- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 65fd9bcb1..a68e4af15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,14 +6,7 @@ license-file = "LICENSE" repository = "https://github.com/eigerco/polka-storage" [workspace] -members = [ - "cli/polka-storage-provider", - "node", - "primitives/cli", - "runtime", - "storage/mater", - "storage/polka-index", -] +members = ["cli/polka-storage-provider", "node", "primitives/cli", "runtime", "storage/mater", "storage/polka-index"] resolver = "2" # FIXME(#@jmg-duarte,#7,14/5/24): remove the patch once something >1.11.0 is released @@ -37,8 +30,8 @@ substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", t async-channel = "2.3.1" async-stream = "0.3.5" -base64 = "0.22.1" async-trait = "0.1.80" +base64 = "0.22.1" bitflags = "2.5.0" byteorder = "1.5.0" bytes = "1.6.0" @@ -132,9 +125,7 @@ substrate-prometheus-endpoint = { git = "https://github.com/paritytech/polkadot- # Polkadot pallet-xcm = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0", default-features = false } -polkadot-cli = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0", features = [ - "rococo-native", -] } +polkadot-cli = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0", features = ["rococo-native"] } polkadot-parachain-primitives = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0", default-features = false } polkadot-primitives = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } polkadot-runtime-common = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0", default-features = false } diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index aeca3ba10..4242e30f9 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -11,11 +11,7 @@ version = "0.1.0" async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } -jsonrpsee = { workspace = true, features = [ - "server", - "http-client", - "ws-client", -] } +jsonrpsee = { workspace = true, features = ["http-client", "server", "ws-client"] } sc-cli = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } From e0c219602dc1d9e7c74fd1b36d0558a30e832abb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 08:13:48 +0200 Subject: [PATCH 06/22] feat: remove uptimeme --- cli/polka-storage-provider/src/commands/info.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index e624196f9..bf7e0e538 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -3,9 +3,10 @@ use std::fmt::{self, Display, Formatter}; use chrono::{DateTime, Utc}; use clap::Parser; -use crate::Error; - -use crate::rpc::{methods::common::Info, RpcClient, RpcMethodExt}; +use crate::{ + rpc::{methods::common::Info, RpcClient, RpcMethodExt}, + Error, +}; /// Command to display information about the storage provider. #[derive(Debug, Clone, Parser)] @@ -34,13 +35,6 @@ struct NodeStatusInfo { impl Display for NodeStatusInfo { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - let uptime = { - let now = Utc::now(); - let uptime = now - self.start_time; - - format!("Uptime: {uptime} (Started at: {})", self.start_time) - }; - - writeln!(f, "{uptime}") + writeln!(f, "Started at: {}", self.start_time) } } From dcebcc64d8de762c3a734549bb6868f2ab5646dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 08:30:13 +0200 Subject: [PATCH 07/22] fix: use url from params --- cli/polka-storage-provider/src/cli.rs | 4 ++-- cli/polka-storage-provider/src/commands/runner.rs | 8 ++------ cli/polka-storage-provider/src/error.rs | 3 +++ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index cf350439b..9772fbd6b 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -12,8 +12,8 @@ use crate::{ pub(crate) struct Cli { #[command(subcommand)] pub subcommand: SubCommand, - #[arg(short, long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] - pub rpc_server_address: String, + #[arg(short, long, default_value_t = format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}"))] + pub rpc_server_url: String, } /// Supported sub-commands. diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index c37ee8be7..f4d1e0d36 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -4,11 +4,7 @@ use clap::Parser; use url::Url; use super::WalletCommand; -use crate::{ - cli::SubCommand, - rpc::{RpcClient, RPC_SERVER_DEFAULT_BIND_ADDR}, - Cli, Error, -}; +use crate::{cli::SubCommand, rpc::RpcClient, Cli, Error}; /// Parses command line arguments into the service configuration and runs the specified /// command with it. @@ -17,7 +13,7 @@ pub(crate) async fn run() -> Result<(), Error> { let cli_arguments: Cli = Cli::parse(); // Rpc client used to interact with the node - let rpc_url = Url::from_str(&format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}")).unwrap(); + let rpc_url = Url::from_str(&cli_arguments.rpc_server_url)?; let rpc_client = RpcClient::new(rpc_url); match &cli_arguments.subcommand { diff --git a/cli/polka-storage-provider/src/error.rs b/cli/polka-storage-provider/src/error.rs index 31f92859a..b3c55346e 100644 --- a/cli/polka-storage-provider/src/error.rs +++ b/cli/polka-storage-provider/src/error.rs @@ -9,6 +9,9 @@ pub enum Error { #[error("FromEnv error: {0}")] EnvError(#[from] tracing_subscriber::filter::FromEnvError), + #[error("URL parse error: {0}")] + ParseUrl(#[from] url::ParseError), + #[error("Substrate error: {0}")] Substrate(#[from] subxt::Error), From 6a3990f2dc7a5e683286e0d031e2d4092dc048db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 10:31:23 +0200 Subject: [PATCH 08/22] fix: add logging --- Cargo.lock | 1 + cli/polka-storage-provider/Cargo.toml | 1 + cli/polka-storage-provider/src/rpc.rs | 24 ++++++++++++++++--- cli/polka-storage-provider/src/rpc/client.rs | 4 ++-- .../src/rpc/methods/common.rs | 1 + 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6827c7ee4..0b2fc9cf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8254,6 +8254,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", ] [[package]] diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index 4242e30f9..df29048b8 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -22,6 +22,7 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 70e7d701a..7a270b244 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -10,6 +10,8 @@ use jsonrpsee::{ }; use methods::create_module; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tracing::{debug, debug_span, error, Instrument}; +use uuid::Uuid; use crate::{substrate, Error}; @@ -45,18 +47,34 @@ pub trait RpcMethodExt: RpcMethod { fn register_async(module: &mut RpcModule) -> &mut jsonrpsee::MethodCallback where Self::Ok: Clone + 'static, + Self::Params: Debug + Send, { module .register_async_method(Self::NAME, move |params, ctx| async move { // Try to deserialize the params + let span = + debug_span!("rpc", id = %Uuid::new_v4(), method = Self::NAME, params = ?params); + let entered = span.enter(); + let params = params.parse().map_err(|e| { tracing::error!("Failed to parse params: {:?}", e); ServerError::invalid_params("Failed to parse params", None) })?; + drop(entered); // Handle the method - let ok = Self::handle(ctx, params).await?; - Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok) + let result = Self::handle(ctx, params).instrument(span.clone()).await; + + match &result { + Ok(ok) => { + span.in_scope(|| debug!(response = ?ok, "handled successfully")); + } + Err(err) => { + span.in_scope(|| error!(err = ?err, "error ocurred while handling")); + } + } + + Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(result?) }) .expect("method should be valid") // This is safe because we know the method registered is valid. } @@ -65,7 +83,7 @@ pub trait RpcMethodExt: RpcMethod { /// /// Returns [`Err`] if any of the parameters fail to serialize. fn request(params: Self::Params) -> Result, serde_json::Error> { - let params = serde_json::to_value(params).expect("params should serialize"); + let params = serde_json::to_value(params)?; Ok(Request { method_name: Self::NAME, diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 272f00c09..70c2e9121 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -15,7 +15,7 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use tokio::sync::OnceCell; -use tracing::{debug, Instrument}; +use tracing::{debug, debug_span, Instrument}; use url::Url; use super::ApiVersion; @@ -45,7 +45,7 @@ impl RpcClient { } = req; let client = self.get_or_init_client(api_version).await?; - let span = tracing::debug_span!("request", method = %method_name, url = %client.url); + let span = debug_span!("request", method = %method_name, url = %client.url, ?params); let work = async { let result = match params { diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index 3279ca158..68ba34248 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use tracing::debug; use crate::rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}; From cc34cf53c7a839dc9dfd699c1e5239b6e0e2f5ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 10:39:55 +0200 Subject: [PATCH 09/22] fix: small changes --- .../src/commands/run.rs | 2 +- cli/polka-storage-provider/src/rpc/client.rs | 47 ++++++++++--------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index 8fab1cb5c..ccd18dfd0 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -18,7 +18,7 @@ pub(crate) struct RunCommand { /// RPC API endpoint used by the parachain node. #[arg(short = 'n', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] pub node_rpc_address: Url, - /// Address used for RPC. By default binds on localhost on port 8000. + /// Address and port used for RPC server. #[arg(short = 'a', long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] pub listen_addr: SocketAddr, } diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 70c2e9121..442821603 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -22,10 +22,11 @@ use super::ApiVersion; pub struct RpcClient { base_url: Url, - v0: OnceCell, + v0: OnceCell, } impl RpcClient { + /// Create a new RPC client with the given base URL. pub fn new(base_url: Url) -> Self { Self { base_url, @@ -33,6 +34,7 @@ impl RpcClient { } } + /// Call an RPC server with the given request. pub async fn call( &self, req: Request, @@ -82,7 +84,8 @@ impl RpcClient { work.instrument(span.or_current()).await } - async fn get_or_init_client(&self, version: ApiVersion) -> Result<&InnerClient, ClientError> { + /// Get or initialize a client for the given API version. + async fn get_or_init_client(&self, version: ApiVersion) -> Result<&Client, ClientError> { match version { ApiVersion::V0 => &self.v0, } @@ -94,19 +97,19 @@ impl RpcClient { let url = self.base_url.join(version_part).map_err(|it| { ClientError::Custom(format!("creating url for endpoint failed: {}", it)) })?; - InnerClient::new(url).await + Client::new(url).await }) .await } } /// Represents a single connection to the URL server -struct InnerClient { +struct Client { url: Url, - specific: ClientSpecific, + specific: ClientInner, } -impl Debug for InnerClient { +impl Debug for Client { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InnerClient") .field("url", &self.url) @@ -114,11 +117,11 @@ impl Debug for InnerClient { } } -impl InnerClient { +impl Client { async fn new(url: Url) -> Result { let specific = match url.scheme() { - "ws" | "wss" => ClientSpecific::Ws(WsClientBuilder::new().build(&url).await?), - "http" | "https" => ClientSpecific::Https(HttpClientBuilder::new().build(&url)?), + "ws" | "wss" => ClientInner::Ws(WsClientBuilder::new().build(&url).await?), + "http" | "https" => ClientInner::Https(HttpClientBuilder::new().build(&url)?), it => { return Err(ClientError::Custom(format!( "Unsupported URL scheme: {}", @@ -131,20 +134,20 @@ impl InnerClient { } } -enum ClientSpecific { +enum ClientInner { Ws(jsonrpsee::ws_client::WsClient), Https(jsonrpsee::http_client::HttpClient), } #[async_trait::async_trait] -impl ClientT for InnerClient { +impl ClientT for Client { async fn notification(&self, method: &str, params: Params) -> Result<(), ClientError> where Params: ToRpcParams + Send, { match &self.specific { - ClientSpecific::Ws(client) => client.notification(method, params).await, - ClientSpecific::Https(client) => client.notification(method, params).await, + ClientInner::Ws(client) => client.notification(method, params).await, + ClientInner::Https(client) => client.notification(method, params).await, } } @@ -154,8 +157,8 @@ impl ClientT for InnerClient { Params: ToRpcParams + Send, { match &self.specific { - ClientSpecific::Ws(client) => client.request(method, params).await, - ClientSpecific::Https(client) => client.request(method, params).await, + ClientInner::Ws(client) => client.request(method, params).await, + ClientInner::Https(client) => client.request(method, params).await, } } @@ -167,14 +170,14 @@ impl ClientT for InnerClient { R: DeserializeOwned + fmt::Debug + 'a, { match &self.specific { - ClientSpecific::Ws(client) => client.batch_request(batch).await, - ClientSpecific::Https(client) => client.batch_request(batch).await, + ClientInner::Ws(client) => client.batch_request(batch).await, + ClientInner::Https(client) => client.batch_request(batch).await, } } } #[async_trait::async_trait] -impl SubscriptionClientT for InnerClient { +impl SubscriptionClientT for Client { async fn subscribe<'a, Notif, Params>( &self, subscribe_method: &'a str, @@ -186,11 +189,11 @@ impl SubscriptionClientT for InnerClient { Notif: DeserializeOwned, { match &self.specific { - ClientSpecific::Ws(it) => { + ClientInner::Ws(it) => { it.subscribe(subscribe_method, params, unsubscribe_method) .await } - ClientSpecific::Https(it) => { + ClientInner::Https(it) => { it.subscribe(subscribe_method, params, unsubscribe_method) .await } @@ -205,8 +208,8 @@ impl SubscriptionClientT for InnerClient { Notif: DeserializeOwned, { match &self.specific { - ClientSpecific::Ws(it) => it.subscribe_to_method(method).await, - ClientSpecific::Https(it) => it.subscribe_to_method(method).await, + ClientInner::Ws(it) => it.subscribe_to_method(method).await, + ClientInner::Https(it) => it.subscribe_to_method(method).await, } } } From 221d8855d8ae777f60b8d322deefd0d761cda2e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 10:41:39 +0200 Subject: [PATCH 10/22] chore: add comment --- cli/polka-storage-provider/src/rpc/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 442821603..7880543b0 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -214,6 +214,7 @@ impl SubscriptionClientT for Client { } } +/// Represents a single RPC request. #[derive(Debug)] pub struct Request { pub method_name: &'static str, From 92d5756e27e88928e1d08e43821f81f8f675f74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 20 Jun 2024 21:12:11 +0200 Subject: [PATCH 11/22] fix: remove unused import --- cli/polka-storage-provider/src/rpc/methods/common.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index 68ba34248..3279ca158 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use tracing::debug; use crate::rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}; From 1ac85ee0d882e6286a7cd3154e6398cde042e323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 21 Jun 2024 11:42:27 +0200 Subject: [PATCH 12/22] fix: pr suggestions --- cli/polka-storage-provider/src/rpc.rs | 15 +++- cli/polka-storage-provider/src/rpc/client.rs | 85 ++++++++++---------- 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 7a270b244..b892185e2 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -1,4 +1,9 @@ -use std::{fmt::Debug, future::Future, net::SocketAddr, sync::Arc}; +use std::{ + fmt::{self, Debug, Display}, + future::Future, + net::SocketAddr, + sync::Arc, +}; use chrono::Utc; use client::Request; @@ -112,6 +117,14 @@ pub enum ApiVersion { V0, } +impl Display for ApiVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0 => write!(f, "rpc/v0"), + } + } +} + pub struct RpcServerState { pub start_time: chrono::DateTime, pub substrate_client: substrate::Client, diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 7880543b0..1cfe49e61 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -14,8 +14,9 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use serde::de::DeserializeOwned; +use serde_json::Value; use tokio::sync::OnceCell; -use tracing::{debug, debug_span, Instrument}; +use tracing::{debug, instrument}; use url::Url; use super::ApiVersion; @@ -35,10 +36,10 @@ impl RpcClient { } /// Call an RPC server with the given request. - pub async fn call( - &self, - req: Request, - ) -> Result { + pub async fn call(&self, req: Request) -> Result + where + T: DeserializeOwned + Debug, + { let Request { method_name, params, @@ -47,41 +48,44 @@ impl RpcClient { } = req; let client = self.get_or_init_client(api_version).await?; - let span = debug_span!("request", method = %method_name, url = %client.url, ?params); - - let work = async { - let result = match params { - serde_json::Value::Null => client.request(method_name, ArrayParams::new()), - serde_json::Value::Array(it) => { - let mut params = ArrayParams::new(); - for param in it { - params.insert(param)? - } - client.request(method_name, params) - } - serde_json::Value::Object(it) => { - let mut params = ObjectParams::new(); - for (name, param) in it { - params.insert(&name, param)? - } - client.request(method_name, params) + Self::request(client, method_name, params).await + } + + #[instrument(skip_all, fields(url = %client.url, params = ?params))] + async fn request(client: &Client, method_name: &str, params: Value) -> Result + where + T: DeserializeOwned + Debug, + { + let result = match params { + Value::Null => client.request(method_name, ArrayParams::new()), + Value::Array(it) => { + let mut params = ArrayParams::new(); + for param in it { + params.insert(param)? } - prim @ (serde_json::Value::Bool(_) - | serde_json::Value::Number(_) - | serde_json::Value::String(_)) => { - return Err(ClientError::Custom(format!( - "invalid parameter type: `{}`", - prim - ))) + + client.request(method_name, params) + } + Value::Object(it) => { + let mut params = ObjectParams::new(); + for (name, param) in it { + params.insert(&name, param)? } + + client.request(method_name, params) + } + prim @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { + return Err(ClientError::Custom(format!( + "invalid parameter type: `{}`", + prim + ))) } - .await; - debug!(?result); + } + .await; - result - }; + debug!(?result, "request completed"); - work.instrument(span.or_current()).await + result } /// Get or initialize a client for the given API version. @@ -90,13 +94,10 @@ impl RpcClient { ApiVersion::V0 => &self.v0, } .get_or_try_init(|| async { - let version_part = match version { - ApiVersion::V0 => "rpc/v0", - }; - - let url = self.base_url.join(version_part).map_err(|it| { + let url = self.base_url.join(&version.to_string()).map_err(|it| { ClientError::Custom(format!("creating url for endpoint failed: {}", it)) })?; + Client::new(url).await }) .await @@ -216,9 +217,9 @@ impl SubscriptionClientT for Client { /// Represents a single RPC request. #[derive(Debug)] -pub struct Request { +pub struct Request { pub method_name: &'static str, - pub params: serde_json::Value, + pub params: Value, pub result_type: PhantomData, pub api_version: ApiVersion, } From 92d7ce044ef7249b7d42156fbfc337d5b71f2110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Wed, 26 Jun 2024 11:48:22 +0200 Subject: [PATCH 13/22] fix: pr suggestions --- cli/polka-storage-provider/src/rpc.rs | 4 +- cli/polka-storage-provider/src/rpc/client.rs | 80 ++++++++++---------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index b892185e2..989ac981a 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -72,10 +72,10 @@ pub trait RpcMethodExt: RpcMethod { match &result { Ok(ok) => { - span.in_scope(|| debug!(response = ?ok, "handled successfully")); + debug!(parent: span, response = ?ok, "handled successfully"); } Err(err) => { - span.in_scope(|| error!(err = ?err, "error ocurred while handling")); + error!(parent: span, err = ?err, "error ocurred while handling") } } diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 1cfe49e61..ce5f670b4 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -48,44 +48,7 @@ impl RpcClient { } = req; let client = self.get_or_init_client(api_version).await?; - Self::request(client, method_name, params).await - } - - #[instrument(skip_all, fields(url = %client.url, params = ?params))] - async fn request(client: &Client, method_name: &str, params: Value) -> Result - where - T: DeserializeOwned + Debug, - { - let result = match params { - Value::Null => client.request(method_name, ArrayParams::new()), - Value::Array(it) => { - let mut params = ArrayParams::new(); - for param in it { - params.insert(param)? - } - - client.request(method_name, params) - } - Value::Object(it) => { - let mut params = ObjectParams::new(); - for (name, param) in it { - params.insert(&name, param)? - } - - client.request(method_name, params) - } - prim @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { - return Err(ClientError::Custom(format!( - "invalid parameter type: `{}`", - prim - ))) - } - } - .await; - - debug!(?result, "request completed"); - - result + request(client, method_name, params).await } /// Get or initialize a client for the given API version. @@ -104,6 +67,43 @@ impl RpcClient { } } +#[instrument(skip_all, fields(url = %client.url, params = ?params))] +async fn request(client: &Client, method_name: &str, params: Value) -> Result +where + T: DeserializeOwned + Debug, +{ + let result = match params { + Value::Null => client.request(method_name, ArrayParams::new()), + Value::Array(it) => { + let mut params = ArrayParams::new(); + for param in it { + params.insert(param)? + } + + client.request(method_name, params) + } + Value::Object(it) => { + let mut params = ObjectParams::new(); + for (name, param) in it { + params.insert(&name, param)? + } + + client.request(method_name, params) + } + param @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { + return Err(ClientError::Custom(format!( + "invalid parameter type: `{}`", + param + ))) + } + } + .await; + + debug!(?result, "request completed"); + + result +} + /// Represents a single connection to the URL server struct Client { url: Url, @@ -217,10 +217,10 @@ impl SubscriptionClientT for Client { /// Represents a single RPC request. #[derive(Debug)] -pub struct Request { +pub struct Request { pub method_name: &'static str, pub params: Value, - pub result_type: PhantomData, + pub result_type: PhantomData, pub api_version: ApiVersion, } From cdfa1939c29f060cffe02a62e94273e1430f2913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Wed, 26 Jun 2024 14:11:43 +0200 Subject: [PATCH 14/22] fix: change request params to generic --- cli/polka-storage-provider/src/rpc.rs | 4 +--- cli/polka-storage-provider/src/rpc/client.rs | 14 +++++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 989ac981a..f85665730 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -87,9 +87,7 @@ pub trait RpcMethodExt: RpcMethod { /// Create a request for this method. /// /// Returns [`Err`] if any of the parameters fail to serialize. - fn request(params: Self::Params) -> Result, serde_json::Error> { - let params = serde_json::to_value(params)?; - + fn request(params: Self::Params) -> Result, serde_json::Error> { Ok(Request { method_name: Self::NAME, params, diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index ce5f670b4..53f608adb 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -13,7 +13,7 @@ use jsonrpsee::{ http_client::HttpClientBuilder, ws_client::WsClientBuilder, }; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tokio::sync::OnceCell; use tracing::{debug, instrument}; @@ -36,9 +36,10 @@ impl RpcClient { } /// Call an RPC server with the given request. - pub async fn call(&self, req: Request) -> Result + pub async fn call(&self, req: Request) -> Result where T: DeserializeOwned + Debug, + P: Serialize + Debug, { let Request { method_name, @@ -68,10 +69,13 @@ impl RpcClient { } #[instrument(skip_all, fields(url = %client.url, params = ?params))] -async fn request(client: &Client, method_name: &str, params: Value) -> Result +async fn request(client: &Client, method_name: &str, params: P) -> Result where T: DeserializeOwned + Debug, + P: Serialize + Debug, { + let params = serde_json::to_value(params)?; + let result = match params { Value::Null => client.request(method_name, ArrayParams::new()), Value::Array(it) => { @@ -217,9 +221,9 @@ impl SubscriptionClientT for Client { /// Represents a single RPC request. #[derive(Debug)] -pub struct Request { +pub struct Request { pub method_name: &'static str, - pub params: Value, + pub params: Params, pub result_type: PhantomData, pub api_version: ApiVersion, } From 7188abc0df50d1b5113587299687ebdc6615fd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 11:12:11 +0200 Subject: [PATCH 15/22] feat: refactor --- cli/polka-storage-provider/src/cli.rs | 2 +- .../src/commands/info.rs | 6 +- .../src/commands/run.rs | 16 +- .../src/commands/runner.rs | 10 +- cli/polka-storage-provider/src/rpc.rs | 141 +------------ cli/polka-storage-provider/src/rpc/client.rs | 185 +++++++----------- cli/polka-storage-provider/src/rpc/error.rs | 49 ----- cli/polka-storage-provider/src/rpc/methods.rs | 59 +++++- .../src/rpc/methods/common.rs | 15 +- .../src/rpc/methods/wallet.rs | 16 +- cli/polka-storage-provider/src/rpc/server.rs | 96 +++++++++ cli/polka-storage-provider/src/rpc/version.rs | 23 +++ 12 files changed, 282 insertions(+), 336 deletions(-) delete mode 100644 cli/polka-storage-provider/src/rpc/error.rs create mode 100644 cli/polka-storage-provider/src/rpc/server.rs create mode 100644 cli/polka-storage-provider/src/rpc/version.rs diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index 9772fbd6b..0216dbc3a 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -2,7 +2,7 @@ use clap::Parser; use crate::{ commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, - rpc::RPC_SERVER_DEFAULT_BIND_ADDR, + rpc::server::RPC_SERVER_DEFAULT_BIND_ADDR, }; /// A CLI application that facilitates management operations over a running full diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index bf7e0e538..2e85abe3e 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use clap::Parser; use crate::{ - rpc::{methods::common::Info, RpcClient, RpcMethodExt}, + rpc::{methods::common::InfoRequest, ClientV0}, Error, }; @@ -13,11 +13,11 @@ use crate::{ pub(crate) struct InfoCommand; impl InfoCommand { - pub async fn run(&self, client: &RpcClient) -> Result<(), Error> { + pub async fn run(&self, client: &ClientV0) -> Result<(), Error> { // TODO(#67,@cernicc,07/06/2024): Print polkadot address used by the provider // Get server info - let server_info = Info::call(client, ()).await?; + let server_info = client.execute(InfoRequest).await?; let node_status_info = NodeStatusInfo { start_time: server_info.start_time, diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index ccd18dfd0..50ad70dac 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -6,7 +6,7 @@ use tracing::info; use url::Url; use crate::{ - rpc::{start_rpc, RpcServerState, RPC_SERVER_DEFAULT_BIND_ADDR}, + rpc::server::{start_rpc_server, RpcServerState, RPC_SERVER_DEFAULT_BIND_ADDR}, substrate, Error, }; @@ -16,16 +16,16 @@ const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; #[derive(Debug, Clone, Parser)] pub(crate) struct RunCommand { /// RPC API endpoint used by the parachain node. - #[arg(short = 'n', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] - pub node_rpc_address: Url, + #[arg(short = 'p', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] + pub parachain_rpc_address: Url, /// Address and port used for RPC server. - #[arg(short = 'a', long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] - pub listen_addr: SocketAddr, + #[arg(short = 'n', long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] + pub full_node_listen_addr: SocketAddr, } impl RunCommand { pub async fn run(&self) -> Result<(), Error> { - let substrate_client = substrate::init_client(self.node_rpc_address.as_str()).await?; + let substrate_client = substrate::init_client(self.parachain_rpc_address.as_str()).await?; let state = Arc::new(RpcServerState { start_time: Utc::now(), @@ -33,8 +33,8 @@ impl RunCommand { }); // Start RPC server - let handle = start_rpc(state, self.listen_addr).await?; - info!("RPC server started at {}", self.listen_addr); + let handle = start_rpc_server(state, self.full_node_listen_addr).await?; + info!("RPC server started at {}", self.full_node_listen_addr); // Monitor shutdown tokio::signal::ctrl_c().await?; diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index f4d1e0d36..312e3b4c1 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -4,17 +4,17 @@ use clap::Parser; use url::Url; use super::WalletCommand; -use crate::{cli::SubCommand, rpc::RpcClient, Cli, Error}; +use crate::{cli::SubCommand, rpc::Client, Cli, Error}; -/// Parses command line arguments into the service configuration and runs the specified -/// command with it. +/// Parses command line arguments into the service configuration and runs the +/// specified command with it. pub(crate) async fn run() -> Result<(), Error> { // CLI arguments parsed and mapped to the struct. let cli_arguments: Cli = Cli::parse(); - // Rpc client used to interact with the node + // RPC client used to interact with the full node let rpc_url = Url::from_str(&cli_arguments.rpc_server_url)?; - let rpc_client = RpcClient::new(rpc_url); + let rpc_client = Client::new(rpc_url).await?; match &cli_arguments.subcommand { SubCommand::Init(cmd) => cmd.run().await, diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index f85665730..22caadc6d 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -1,141 +1,6 @@ -use std::{ - fmt::{self, Debug, Display}, - future::Future, - net::SocketAddr, - sync::Arc, -}; - -use chrono::Utc; -use client::Request; -use error::ServerError; -use jsonrpsee::{ - core::ClientError, - server::{Server, ServerHandle}, - RpcModule, -}; -use methods::create_module; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use tracing::{debug, debug_span, error, Instrument}; -use uuid::Uuid; - -use crate::{substrate, Error}; - mod client; -pub mod error; pub mod methods; +pub mod server; +pub mod version; -pub use client::RpcClient; - -/// Default address to bind the RPC server to. -pub const RPC_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; - -/// A definition of an RPC method handler which can be registered with an [`RpcModule`]. -pub trait RpcMethod { - /// Method name. - const NAME: &'static str; - /// See [`ApiVersion`]. - const API_VERSION: ApiVersion; - /// Successful response type. - type Ok: Debug + Serialize + DeserializeOwned; - /// Parameters type. - type Params: Debug + Serialize + DeserializeOwned; - - /// Logic for this method. - fn handle( - ctx: Arc, - params: Self::Params, - ) -> impl Future> + Send; -} - -pub trait RpcMethodExt: RpcMethod { - /// Register this method with an [`RpcModule`]. - fn register_async(module: &mut RpcModule) -> &mut jsonrpsee::MethodCallback - where - Self::Ok: Clone + 'static, - Self::Params: Debug + Send, - { - module - .register_async_method(Self::NAME, move |params, ctx| async move { - // Try to deserialize the params - let span = - debug_span!("rpc", id = %Uuid::new_v4(), method = Self::NAME, params = ?params); - let entered = span.enter(); - - let params = params.parse().map_err(|e| { - tracing::error!("Failed to parse params: {:?}", e); - ServerError::invalid_params("Failed to parse params", None) - })?; - drop(entered); - - // Handle the method - let result = Self::handle(ctx, params).instrument(span.clone()).await; - - match &result { - Ok(ok) => { - debug!(parent: span, response = ?ok, "handled successfully"); - } - Err(err) => { - error!(parent: span, err = ?err, "error ocurred while handling") - } - } - - Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(result?) - }) - .expect("method should be valid") // This is safe because we know the method registered is valid. - } - - /// Create a request for this method. - /// - /// Returns [`Err`] if any of the parameters fail to serialize. - fn request(params: Self::Params) -> Result, serde_json::Error> { - Ok(Request { - method_name: Self::NAME, - params, - result_type: std::marker::PhantomData, - api_version: Self::API_VERSION, - }) - } - - /// Call the method with the provided client and params. - async fn call(client: &RpcClient, params: Self::Params) -> Result { - let response = client.call(Self::request(params)?).await?; - Ok(response) - } -} - -/// Blanket implementation for all types that implement [`RpcMethod`]. -impl RpcMethodExt for T where T: RpcMethod {} - -/// Available API versions. -/// -/// These are significant because they are expressed in the URL path against -/// which RPC calls are made, e.g `rpc/v0` or `rpc/v1`. -#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] -pub enum ApiVersion { - V0, -} - -impl Display for ApiVersion { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::V0 => write!(f, "rpc/v0"), - } - } -} - -pub struct RpcServerState { - pub start_time: chrono::DateTime, - pub substrate_client: substrate::Client, -} - -pub async fn start_rpc( - state: Arc, - listen_addr: SocketAddr, -) -> Result { - let server = Server::builder().build(listen_addr).await?; - - let module = create_module(state.clone()); - let server_handle = server.start(module); - - Ok(server_handle) -} +pub use client::{Client, ClientV0}; diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index 53f608adb..e87de0c0c 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -13,108 +13,27 @@ use jsonrpsee::{ http_client::HttpClientBuilder, ws_client::WsClientBuilder, }; -use serde::{de::DeserializeOwned, Serialize}; +use serde::de::DeserializeOwned; use serde_json::Value; -use tokio::sync::OnceCell; use tracing::{debug, instrument}; use url::Url; -use super::ApiVersion; - -pub struct RpcClient { - base_url: Url, - v0: OnceCell, -} - -impl RpcClient { - /// Create a new RPC client with the given base URL. - pub fn new(base_url: Url) -> Self { - Self { - base_url, - v0: OnceCell::new(), - } - } - - /// Call an RPC server with the given request. - pub async fn call(&self, req: Request) -> Result - where - T: DeserializeOwned + Debug, - P: Serialize + Debug, - { - let Request { - method_name, - params, - api_version, - .. - } = req; - - let client = self.get_or_init_client(api_version).await?; - request(client, method_name, params).await - } - - /// Get or initialize a client for the given API version. - async fn get_or_init_client(&self, version: ApiVersion) -> Result<&Client, ClientError> { - match version { - ApiVersion::V0 => &self.v0, - } - .get_or_try_init(|| async { - let url = self.base_url.join(&version.to_string()).map_err(|it| { - ClientError::Custom(format!("creating url for endpoint failed: {}", it)) - })?; - - Client::new(url).await - }) - .await - } -} - -#[instrument(skip_all, fields(url = %client.url, params = ?params))] -async fn request(client: &Client, method_name: &str, params: P) -> Result -where - T: DeserializeOwned + Debug, - P: Serialize + Debug, -{ - let params = serde_json::to_value(params)?; - - let result = match params { - Value::Null => client.request(method_name, ArrayParams::new()), - Value::Array(it) => { - let mut params = ArrayParams::new(); - for param in it { - params.insert(param)? - } - - client.request(method_name, params) - } - Value::Object(it) => { - let mut params = ObjectParams::new(); - for (name, param) in it { - params.insert(&name, param)? - } - - client.request(method_name, params) - } - param @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { - return Err(ClientError::Custom(format!( - "invalid parameter type: `{}`", - param - ))) - } - } - .await; - - debug!(?result, "request completed"); +use super::{ + methods::RpcRequest, + version::{ApiVersion, V0}, +}; - result -} +/// Type alias for the V0 client instance +pub type ClientV0 = Client; /// Represents a single connection to the URL server -struct Client { +pub struct Client { url: Url, - specific: ClientInner, + inner: ClientInner, + _version: PhantomData, } -impl Debug for Client { +impl Debug for Client { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InnerClient") .field("url", &self.url) @@ -122,9 +41,9 @@ impl Debug for Client { } } -impl Client { - async fn new(url: Url) -> Result { - let specific = match url.scheme() { +impl Client { + pub async fn new(url: Url) -> Result { + let inner = match url.scheme() { "ws" | "wss" => ClientInner::Ws(WsClientBuilder::new().build(&url).await?), "http" | "https" => ClientInner::Https(HttpClientBuilder::new().build(&url)?), it => { @@ -135,7 +54,52 @@ impl Client { } }; - Ok(Self { url, specific }) + Ok(Self { + url, + inner, + _version: PhantomData, + }) + } + + #[instrument(skip_all, fields(url = %self.url, method = %Request::NAME))] + pub async fn execute(&self, request: Request) -> Result + where + Request: RpcRequest, + Version: ApiVersion, + { + let method_name = Request::NAME; + let params = serde_json::to_value(request.get_params())?; + + let result = match params { + Value::Null => self.inner.request(method_name, ArrayParams::new()), + Value::Array(it) => { + let mut params = ArrayParams::new(); + for param in it { + params.insert(param)? + } + + self.inner.request(method_name, params) + } + Value::Object(it) => { + let mut params = ObjectParams::new(); + for (name, param) in it { + params.insert(&name, param)? + } + + self.inner.request(method_name, params) + } + param @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { + return Err(ClientError::Custom(format!( + "invalid parameter type: `{}`", + param + ))) + } + } + .await; + + debug!(?result, "response received"); + + result } } @@ -145,12 +109,12 @@ enum ClientInner { } #[async_trait::async_trait] -impl ClientT for Client { +impl ClientT for ClientInner { async fn notification(&self, method: &str, params: Params) -> Result<(), ClientError> where Params: ToRpcParams + Send, { - match &self.specific { + match &self { ClientInner::Ws(client) => client.notification(method, params).await, ClientInner::Https(client) => client.notification(method, params).await, } @@ -161,7 +125,7 @@ impl ClientT for Client { R: DeserializeOwned, Params: ToRpcParams + Send, { - match &self.specific { + match &self { ClientInner::Ws(client) => client.request(method, params).await, ClientInner::Https(client) => client.request(method, params).await, } @@ -174,7 +138,7 @@ impl ClientT for Client { where R: DeserializeOwned + fmt::Debug + 'a, { - match &self.specific { + match &self { ClientInner::Ws(client) => client.batch_request(batch).await, ClientInner::Https(client) => client.batch_request(batch).await, } @@ -182,7 +146,7 @@ impl ClientT for Client { } #[async_trait::async_trait] -impl SubscriptionClientT for Client { +impl SubscriptionClientT for ClientInner { async fn subscribe<'a, Notif, Params>( &self, subscribe_method: &'a str, @@ -193,7 +157,7 @@ impl SubscriptionClientT for Client { Params: ToRpcParams + Send, Notif: DeserializeOwned, { - match &self.specific { + match &self { ClientInner::Ws(it) => { it.subscribe(subscribe_method, params, unsubscribe_method) .await @@ -212,24 +176,9 @@ impl SubscriptionClientT for Client { where Notif: DeserializeOwned, { - match &self.specific { + match &self { ClientInner::Ws(it) => it.subscribe_to_method(method).await, ClientInner::Https(it) => it.subscribe_to_method(method).await, } } } - -/// Represents a single RPC request. -#[derive(Debug)] -pub struct Request { - pub method_name: &'static str, - pub params: Params, - pub result_type: PhantomData, - pub api_version: ApiVersion, -} - -impl ToRpcParams for Request { - fn to_rpc_params(self) -> Result>, serde_json::Error> { - Ok(Some(serde_json::value::to_raw_value(&self.params)?)) - } -} diff --git a/cli/polka-storage-provider/src/rpc/error.rs b/cli/polka-storage-provider/src/rpc/error.rs deleted file mode 100644 index af5578605..000000000 --- a/cli/polka-storage-provider/src/rpc/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fmt::Display; - -use jsonrpsee::types::{ - error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, - ErrorObjectOwned, -}; -use serde_json::Value; - -/// Error type for RPC server errors. -#[derive(Debug)] -pub struct ServerError { - inner: ErrorObjectOwned, -} - -impl std::error::Error for ServerError {} - -impl Display for ServerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "JSON-RPC error: {}", &self.inner) - } -} - -impl From for ErrorObjectOwned { - fn from(err: ServerError) -> Self { - err.inner - } -} - -impl ServerError { - pub fn new(code: i32, message: impl Display, data: impl Into>) -> Self { - Self { - inner: ErrorObjectOwned::owned(code, message.to_string(), data.into()), - } - } - - pub fn internal_error(message: impl Display, data: impl Into>) -> Self { - Self::new(INTERNAL_ERROR_CODE, message, data) - } - - pub fn invalid_params(message: impl Display, data: impl Into>) -> Self { - Self::new(INVALID_PARAMS_CODE, message, data) - } -} - -impl From for ServerError { - fn from(err: subxt::Error) -> Self { - Self::internal_error(err, None) - } -} diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index ea11dd567..e4b7f705c 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -1,17 +1,64 @@ -use std::sync::Arc; +use std::{fmt::Debug, future::Future, sync::Arc}; use jsonrpsee::RpcModule; +use serde::{de::DeserializeOwned, Serialize}; +use tracing::{debug, debug_span, error, Instrument}; +use uuid::Uuid; -use super::{RpcMethodExt, RpcServerState}; +use super::server::{RpcServerState, ServerError}; pub mod common; pub mod wallet; -pub fn create_module(state: Arc) -> RpcModule { - let mut module = RpcModule::from_arc(state); +/// A definition of an RPC method handler which can be registered with an [`RpcModule`]. +pub trait RpcRequest { + /// Method name. + const NAME: &'static str; + /// Successful response type. + type Ok: Clone + Debug + Serialize + DeserializeOwned + Send + Sync + 'static; + /// Parameters type. + type Params: Clone + Debug + Serialize + DeserializeOwned + Send + Sync; - common::Info::register_async(&mut module); - wallet::WalletBalance::register_async(&mut module); + /// Get request parameters. + fn get_params(&self) -> Self::Params; + /// Logic for handling this request. + fn handle( + ctx: Arc, + params: Self::Params, + ) -> impl Future> + Send; +} + +/// Register the RpcRequest handle with the [`RpcModule`]. +pub fn register_async( + module: &mut RpcModule, +) -> &mut jsonrpsee::MethodCallback +where + Request: RpcRequest, +{ module + .register_async_method(Request::NAME, move |params, ctx| async move { + // Try to deserialize the params + let span = + debug_span!("method", id = %Uuid::new_v4(), method = Request::NAME, params = ?params); + let params = params.parse().map_err(|err| { + error!(parent: span.clone(), ?err, ?params, "failed to parse params"); + ServerError::invalid_params("Failed to parse params", None) + })?; + + // Handle the method + let result = Request::handle(ctx, params).instrument(span.clone()).await; + + match &result { + Ok(ok) => { + debug!(parent: span, response = ?ok, "handled successfully"); + } + Err(err) => { + error!(parent: span, err = ?err, "error ocurred while handling") + } + } + + Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(result?) + }) + .expect("method should be valid") // This is safe because we know the method registered is valid. } diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index 3279ca158..ac046c9ac 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -3,18 +3,25 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use crate::rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}; +use super::RpcRequest; +use crate::rpc::{ + server::{RpcServerState, ServerError}, + version::V0, +}; /// This RPC method exposes the system information. #[derive(Debug)] -pub struct Info; +pub struct InfoRequest; -impl RpcMethod for Info { +impl RpcRequest for InfoRequest { const NAME: &'static str = "info"; - const API_VERSION: ApiVersion = ApiVersion::V0; type Ok = InfoResult; type Params = (); + fn get_params(&self) -> Self::Params { + () + } + async fn handle(ctx: Arc, _: Self::Params) -> Result { Ok(InfoResult { start_time: ctx.start_time, diff --git a/cli/polka-storage-provider/src/rpc/methods/wallet.rs b/cli/polka-storage-provider/src/rpc/methods/wallet.rs index d8c5bef53..f5425a503 100644 --- a/cli/polka-storage-provider/src/rpc/methods/wallet.rs +++ b/cli/polka-storage-provider/src/rpc/methods/wallet.rs @@ -3,20 +3,27 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use subxt_signer::sr25519::dev; +use super::RpcRequest; use crate::{ - rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}, + rpc::{ + server::{RpcServerState, ServerError}, + version::V0, + }, substrate::get_system_balances, }; /// This RPC method exposes getting the system balances for the particular /// account. -pub struct WalletBalance; -impl RpcMethod for WalletBalance { +pub struct WalletRequest; +impl RpcRequest for WalletRequest { const NAME: &'static str = "wallet_balance"; - const API_VERSION: ApiVersion = ApiVersion::V0; type Ok = Option; type Params = (); + fn get_params(&self) -> Self::Params { + () + } + async fn handle(ctx: Arc, _: Self::Params) -> Result { // TODO(#68,@cernicc,05/06/2024): Implement correctly. dev alice is used as a show case for now. let account = dev::alice().public_key().into(); @@ -30,6 +37,7 @@ impl RpcMethod for WalletBalance { } } +/// Result of the wallet balance request. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalletBalanceResult { free: String, diff --git a/cli/polka-storage-provider/src/rpc/server.rs b/cli/polka-storage-provider/src/rpc/server.rs new file mode 100644 index 000000000..4ffff1742 --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/server.rs @@ -0,0 +1,96 @@ +use std::{ + fmt::{Debug, Display}, + net::SocketAddr, + sync::Arc, +}; + +use chrono::Utc; +use jsonrpsee::{ + server::{Server, ServerHandle}, + types::{ + error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, + ErrorObjectOwned, + }, + RpcModule, +}; +use serde_json::Value; + +use super::{ + methods::{common::InfoRequest, register_async, wallet::WalletRequest}, + version::V0, +}; +use crate::{substrate, Error}; + +/// Default address to bind the RPC server to. +pub const RPC_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; + +/// RPC server shared state. +pub struct RpcServerState { + pub start_time: chrono::DateTime, + pub substrate_client: substrate::Client, +} + +/// Start the RPC server. +pub async fn start_rpc_server( + state: Arc, + listen_addr: SocketAddr, +) -> Result { + let server = Server::builder().build(listen_addr).await?; + + let module = create_module(state.clone()); + let server_handle = server.start(module); + + Ok(server_handle) +} + +/// Initialize [`RpcModule`]. +pub fn create_module(state: Arc) -> RpcModule { + let mut module = RpcModule::from_arc(state); + + register_async::(&mut module); + register_async::(&mut module); + + module +} + +/// Error type for RPC server errors. +#[derive(Debug)] +pub struct ServerError { + inner: ErrorObjectOwned, +} + +impl std::error::Error for ServerError {} + +impl Display for ServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "JSON-RPC error: {}", &self.inner) + } +} + +impl From for ErrorObjectOwned { + fn from(err: ServerError) -> Self { + err.inner + } +} + +impl ServerError { + pub fn new(code: i32, message: impl Display, data: impl Into>) -> Self { + Self { + inner: ErrorObjectOwned::owned(code, message.to_string(), data.into()), + } + } + + pub fn internal_error(message: impl Display, data: impl Into>) -> Self { + Self::new(INTERNAL_ERROR_CODE, message, data) + } + + pub fn invalid_params(message: impl Display, data: impl Into>) -> Self { + Self::new(INVALID_PARAMS_CODE, message, data) + } +} + +impl From for ServerError { + fn from(err: subxt::Error) -> Self { + Self::internal_error(err, None) + } +} diff --git a/cli/polka-storage-provider/src/rpc/version.rs b/cli/polka-storage-provider/src/rpc/version.rs new file mode 100644 index 000000000..b473b5e15 --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/version.rs @@ -0,0 +1,23 @@ +use private::Sealed; + +/// Sealed trait to prevent external implementations. +mod private { + pub trait Sealed {} +} + +/// RPC API version. +pub trait ApiVersion: Sealed { + /// Returns the version string. + fn version() -> &'static str; +} + +/// RPC API version v0. +pub struct V0; + +impl Sealed for V0 {} + +impl ApiVersion for V0 { + fn version() -> &'static str { + "rpc/v0" + } +} From b3e1c9f56e0368bdd603eb1a157bf58f554a54f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 11:19:03 +0200 Subject: [PATCH 16/22] fix: after merge --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4c8b7f15..53c6ae2f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9859,8 +9859,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", - "itertools 0.10.5", + "heck 0.5.0", + "itertools 0.12.1", "log", "multimap", "once_cell", From 3bccbff7a47889c25e5780c6b0983034d338e000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 12:01:10 +0200 Subject: [PATCH 17/22] feat: add custom rpc client error --- cli/polka-storage-provider/src/error.rs | 6 +- cli/polka-storage-provider/src/rpc.rs | 2 +- cli/polka-storage-provider/src/rpc/client.rs | 63 ++++++++++++++------ 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/cli/polka-storage-provider/src/error.rs b/cli/polka-storage-provider/src/error.rs index b3c55346e..144d07ea9 100644 --- a/cli/polka-storage-provider/src/error.rs +++ b/cli/polka-storage-provider/src/error.rs @@ -1,5 +1,7 @@ use thiserror::Error; +use crate::rpc::ClientError; + /// CLI components error handling implementor. #[derive(Debug, Error)] pub enum Error { @@ -18,6 +20,6 @@ pub enum Error { #[error(transparent)] SubstrateCli(#[from] sc_cli::Error), - #[error("JSON-RPC client error: {0}")] - JsonRpcClient(#[from] jsonrpsee::core::ClientError), + #[error("Rpc Client error: {0}")] + RpcClient(#[from] ClientError), } diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index 22caadc6d..a79f0eb00 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -3,4 +3,4 @@ pub mod methods; pub mod server; pub mod version; -pub use client::{Client, ClientV0}; +pub use client::{Client, ClientError, ClientV0}; diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index e87de0c0c..d6e8dddd9 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -8,13 +8,13 @@ use jsonrpsee::{ client::{BatchResponse, ClientT, Subscription, SubscriptionClientT}, params::{ArrayParams, BatchRequestBuilder, ObjectParams}, traits::ToRpcParams, - ClientError, }, http_client::HttpClientBuilder, ws_client::WsClientBuilder, }; use serde::de::DeserializeOwned; use serde_json::Value; +use thiserror::Error; use tracing::{debug, instrument}; use url::Url; @@ -26,6 +26,25 @@ use super::{ /// Type alias for the V0 client instance pub type ClientV0 = Client; +/// Errors that can occur when working with the client +#[derive(Debug, Error)] +pub enum ClientError { + #[error("Unsupported scheme error: {0}")] + UnsupportedUrlScheme(String), + + #[error("Invalid parameter type: {0}")] + InvalidParameter(Value), + + #[error(transparent)] + Url(#[from] url::ParseError), + + #[error(transparent)] + JsonRpcClient(#[from] jsonrpsee::core::ClientError), + + #[error(transparent)] + Json(#[from] serde_json::Error), +} + /// Represents a single connection to the URL server pub struct Client { url: Url, @@ -41,17 +60,16 @@ impl Debug for Client { } } -impl Client { - pub async fn new(url: Url) -> Result { +impl Client +where + Version: ApiVersion, +{ + pub async fn new(base_url: Url) -> Result { + let url = base_url.join(Version::version())?; let inner = match url.scheme() { "ws" | "wss" => ClientInner::Ws(WsClientBuilder::new().build(&url).await?), "http" | "https" => ClientInner::Https(HttpClientBuilder::new().build(&url)?), - it => { - return Err(ClientError::Custom(format!( - "Unsupported URL scheme: {}", - it - ))) - } + it => return Err(ClientError::UnsupportedUrlScheme(it.to_string())), }; Ok(Self { @@ -89,17 +107,16 @@ impl Client { self.inner.request(method_name, params) } param @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { - return Err(ClientError::Custom(format!( - "invalid parameter type: `{}`", - param - ))) + return Err(ClientError::InvalidParameter(param)) } } .await; debug!(?result, "response received"); - result + // We cant return result directly because compiler needs some help to + // understand the types + Ok(result?) } } @@ -110,7 +127,11 @@ enum ClientInner { #[async_trait::async_trait] impl ClientT for ClientInner { - async fn notification(&self, method: &str, params: Params) -> Result<(), ClientError> + async fn notification( + &self, + method: &str, + params: Params, + ) -> Result<(), jsonrpsee::core::ClientError> where Params: ToRpcParams + Send, { @@ -120,7 +141,11 @@ impl ClientT for ClientInner { } } - async fn request(&self, method: &str, params: Params) -> Result + async fn request( + &self, + method: &str, + params: Params, + ) -> Result where R: DeserializeOwned, Params: ToRpcParams + Send, @@ -134,7 +159,7 @@ impl ClientT for ClientInner { async fn batch_request<'a, R>( &self, batch: BatchRequestBuilder<'a>, - ) -> Result, ClientError> + ) -> Result, jsonrpsee::core::ClientError> where R: DeserializeOwned + fmt::Debug + 'a, { @@ -152,7 +177,7 @@ impl SubscriptionClientT for ClientInner { subscribe_method: &'a str, params: Params, unsubscribe_method: &'a str, - ) -> Result, ClientError> + ) -> Result, jsonrpsee::core::ClientError> where Params: ToRpcParams + Send, Notif: DeserializeOwned, @@ -172,7 +197,7 @@ impl SubscriptionClientT for ClientInner { async fn subscribe_to_method<'a, Notif>( &self, method: &'a str, - ) -> Result, ClientError> + ) -> Result, jsonrpsee::core::ClientError> where Notif: DeserializeOwned, { From ab60f49a68252a9cae125b450c11b238606d275d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 12:10:53 +0200 Subject: [PATCH 18/22] chore: some comments --- cli/polka-storage-provider/src/rpc/methods.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index e4b7f705c..f0c630b48 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -10,7 +10,7 @@ use super::server::{RpcServerState, ServerError}; pub mod common; pub mod wallet; -/// A definition of an RPC method handler which can be registered with an [`RpcModule`]. +/// A trait for defining a versioned RPC request. pub trait RpcRequest { /// Method name. const NAME: &'static str; @@ -22,14 +22,15 @@ pub trait RpcRequest { /// Get request parameters. fn get_params(&self) -> Self::Params; - /// Logic for handling this request. + /// A definition of an RPC request handle which can be registered with an + /// [`RpcModule`]. This specifies how to handle some specific RPC request. fn handle( ctx: Arc, params: Self::Params, ) -> impl Future> + Send; } -/// Register the RpcRequest handle with the [`RpcModule`]. +/// Register the [`RpcRequest`] handle with the [`RpcModule`]. pub fn register_async( module: &mut RpcModule, ) -> &mut jsonrpsee::MethodCallback From e1494bf1096dd5de10e962be501a308227f3581e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 12:19:24 +0200 Subject: [PATCH 19/22] fix: remove unused bounds --- cli/polka-storage-provider/src/rpc/methods.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index f0c630b48..ee5b32f26 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -15,9 +15,9 @@ pub trait RpcRequest { /// Method name. const NAME: &'static str; /// Successful response type. - type Ok: Clone + Debug + Serialize + DeserializeOwned + Send + Sync + 'static; + type Ok: Debug + Clone + Serialize + DeserializeOwned + 'static; /// Parameters type. - type Params: Clone + Debug + Serialize + DeserializeOwned + Send + Sync; + type Params: Debug + Serialize + DeserializeOwned; /// Get request parameters. fn get_params(&self) -> Self::Params; From 98f687293c2857fa705a794b690d16fd91a62b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 14:48:44 +0200 Subject: [PATCH 20/22] fix: some pr suggestions --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + cli/polka-storage-provider/Cargo.toml | 1 + cli/polka-storage-provider/src/cli.rs | 7 +++++-- cli/polka-storage-provider/src/commands/run.rs | 14 +++++++------- cli/polka-storage-provider/src/commands/runner.rs | 6 +----- cli/polka-storage-provider/src/rpc/version.rs | 13 ++++--------- 7 files changed, 32 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53c6ae2f3..3330cddb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8298,6 +8298,7 @@ dependencies = [ "clap", "jsonrpsee", "sc-cli", + "sealed", "serde", "serde_json", "subxt", @@ -12240,6 +12241,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "sealed" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "sec1" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 41b6f5ecd..8377ef686 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ quote = { version = "1.0.33" } rand = "0.8.5" rocksdb = { version = "0.21" } scale-info = { version = "2.11.1", default-features = false } +sealed = "0.5" serde = { version = "1.0.197", default-features = false } serde-big-array = { version = "0.3.2" } serde_derive = { version = "1.0.117" } diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index df29048b8..2170a02af 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -13,6 +13,7 @@ chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } jsonrpsee = { workspace = true, features = ["http-client", "server", "ws-client"] } sc-cli = { workspace = true } +sealed = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } subxt = { workspace = true } diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index 0216dbc3a..904a090d2 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -1,4 +1,5 @@ use clap::Parser; +use url::Url; use crate::{ commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, @@ -12,8 +13,10 @@ use crate::{ pub(crate) struct Cli { #[command(subcommand)] pub subcommand: SubCommand, - #[arg(short, long, default_value_t = format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}"))] - pub rpc_server_url: String, + + /// URL of the providers RPC server. + #[arg(long, default_value_t = Url::parse(&format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}")).unwrap())] + pub rpc_server_url: Url, } /// Supported sub-commands. diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index 50ad70dac..7ba742dc5 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -16,16 +16,16 @@ const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; #[derive(Debug, Clone, Parser)] pub(crate) struct RunCommand { /// RPC API endpoint used by the parachain node. - #[arg(short = 'p', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] - pub parachain_rpc_address: Url, + #[arg(long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] + pub rpc_address: Url, /// Address and port used for RPC server. - #[arg(short = 'n', long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] - pub full_node_listen_addr: SocketAddr, + #[arg(long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] + pub listen_addr: SocketAddr, } impl RunCommand { pub async fn run(&self) -> Result<(), Error> { - let substrate_client = substrate::init_client(self.parachain_rpc_address.as_str()).await?; + let substrate_client = substrate::init_client(self.rpc_address.as_str()).await?; let state = Arc::new(RpcServerState { start_time: Utc::now(), @@ -33,8 +33,8 @@ impl RunCommand { }); // Start RPC server - let handle = start_rpc_server(state, self.full_node_listen_addr).await?; - info!("RPC server started at {}", self.full_node_listen_addr); + let handle = start_rpc_server(state, self.listen_addr).await?; + info!("RPC server started at {}", self.listen_addr); // Monitor shutdown tokio::signal::ctrl_c().await?; diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index 312e3b4c1..af733ed0f 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -1,7 +1,4 @@ -use std::str::FromStr; - use clap::Parser; -use url::Url; use super::WalletCommand; use crate::{cli::SubCommand, rpc::Client, Cli, Error}; @@ -13,8 +10,7 @@ pub(crate) async fn run() -> Result<(), Error> { let cli_arguments: Cli = Cli::parse(); // RPC client used to interact with the full node - let rpc_url = Url::from_str(&cli_arguments.rpc_server_url)?; - let rpc_client = Client::new(rpc_url).await?; + let rpc_client = Client::new(cli_arguments.rpc_server_url).await?; match &cli_arguments.subcommand { SubCommand::Init(cmd) => cmd.run().await, diff --git a/cli/polka-storage-provider/src/rpc/version.rs b/cli/polka-storage-provider/src/rpc/version.rs index b473b5e15..5a7644ba1 100644 --- a/cli/polka-storage-provider/src/rpc/version.rs +++ b/cli/polka-storage-provider/src/rpc/version.rs @@ -1,12 +1,8 @@ -use private::Sealed; - -/// Sealed trait to prevent external implementations. -mod private { - pub trait Sealed {} -} +use sealed::sealed; /// RPC API version. -pub trait ApiVersion: Sealed { +#[sealed] +pub trait ApiVersion { /// Returns the version string. fn version() -> &'static str; } @@ -14,8 +10,7 @@ pub trait ApiVersion: Sealed { /// RPC API version v0. pub struct V0; -impl Sealed for V0 {} - +#[sealed] impl ApiVersion for V0 { fn version() -> &'static str { "rpc/v0" From 4a07a9b0b69bde387cb5f1055ce4ed1f7cf31598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 15:55:40 +0200 Subject: [PATCH 21/22] fix: pr related --- cli/polka-storage-provider/src/cli.rs | 25 ++++++++++++++++++- .../src/commands/info.rs | 6 ++--- .../src/commands/init.rs | 4 +-- .../src/commands/run.rs | 5 ++-- .../src/commands/runner.rs | 8 ++++-- cli/polka-storage-provider/src/error.rs | 25 ------------------- cli/polka-storage-provider/src/main.rs | 5 ++-- cli/polka-storage-provider/src/rpc.rs | 2 +- cli/polka-storage-provider/src/rpc/client.rs | 22 +++++++--------- cli/polka-storage-provider/src/rpc/methods.rs | 9 +++++-- .../src/rpc/methods/common.rs | 2 +- .../src/rpc/methods/wallet.rs | 2 +- cli/polka-storage-provider/src/rpc/server.rs | 10 +++++--- 13 files changed, 66 insertions(+), 59 deletions(-) delete mode 100644 cli/polka-storage-provider/src/error.rs diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index 904a090d2..b1f2beeef 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -1,9 +1,10 @@ use clap::Parser; +use thiserror::Error; use url::Url; use crate::{ commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, - rpc::server::RPC_SERVER_DEFAULT_BIND_ADDR, + rpc::{server::RPC_SERVER_DEFAULT_BIND_ADDR, ClientError}, }; /// A CLI application that facilitates management operations over a running full @@ -32,3 +33,25 @@ pub enum SubCommand { #[command(subcommand)] Wallet(WalletCommand), } + +/// CLI components error handling implementor. +#[derive(Debug, Error)] +pub enum CliError { + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("FromEnv error: {0}")] + EnvError(#[from] tracing_subscriber::filter::FromEnvError), + + #[error("URL parse error: {0}")] + ParseUrl(#[from] url::ParseError), + + #[error("Substrate error: {0}")] + Substrate(#[from] subxt::Error), + + #[error(transparent)] + SubstrateCli(#[from] sc_cli::Error), + + #[error("Rpc Client error: {0}")] + RpcClient(#[from] ClientError), +} diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index 2e85abe3e..38d84169e 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -4,8 +4,8 @@ use chrono::{DateTime, Utc}; use clap::Parser; use crate::{ - rpc::{methods::common::InfoRequest, ClientV0}, - Error, + cli::CliError, + rpc::{methods::common::InfoRequest, version::V0, Client}, }; /// Command to display information about the storage provider. @@ -13,7 +13,7 @@ use crate::{ pub(crate) struct InfoCommand; impl InfoCommand { - pub async fn run(&self, client: &ClientV0) -> Result<(), Error> { + pub async fn run(&self, client: &Client) -> Result<(), CliError> { // TODO(#67,@cernicc,07/06/2024): Print polkadot address used by the provider // Get server info diff --git a/cli/polka-storage-provider/src/commands/init.rs b/cli/polka-storage-provider/src/commands/init.rs index 248aa293b..083c17090 100644 --- a/cli/polka-storage-provider/src/commands/init.rs +++ b/cli/polka-storage-provider/src/commands/init.rs @@ -1,14 +1,14 @@ use clap::Parser; use tracing::info; -use crate::Error; +use crate::cli::CliError; /// Command to initialize the storage provider. #[derive(Debug, Clone, Parser)] pub(crate) struct InitCommand; impl InitCommand { - pub async fn run(&self) -> Result<(), Error> { + pub async fn run(&self) -> Result<(), CliError> { info!("Initializing polka storage provider..."); // TODO(#64,@cernicc,31/05/2024): Init needed configurations. // TODO(#65,@cernicc,31/05/2024): Check if full node is synced diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index 7ba742dc5..5f31a656b 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -6,8 +6,9 @@ use tracing::info; use url::Url; use crate::{ + cli::CliError, rpc::server::{start_rpc_server, RpcServerState, RPC_SERVER_DEFAULT_BIND_ADDR}, - substrate, Error, + substrate, }; const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; @@ -24,7 +25,7 @@ pub(crate) struct RunCommand { } impl RunCommand { - pub async fn run(&self) -> Result<(), Error> { + pub async fn run(&self) -> Result<(), CliError> { let substrate_client = substrate::init_client(self.rpc_address.as_str()).await?; let state = Arc::new(RpcServerState { diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index af733ed0f..408e212d1 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -1,11 +1,15 @@ use clap::Parser; use super::WalletCommand; -use crate::{cli::SubCommand, rpc::Client, Cli, Error}; +use crate::{ + cli::{CliError, SubCommand}, + rpc::Client, + Cli, +}; /// Parses command line arguments into the service configuration and runs the /// specified command with it. -pub(crate) async fn run() -> Result<(), Error> { +pub(crate) async fn run() -> Result<(), CliError> { // CLI arguments parsed and mapped to the struct. let cli_arguments: Cli = Cli::parse(); diff --git a/cli/polka-storage-provider/src/error.rs b/cli/polka-storage-provider/src/error.rs deleted file mode 100644 index 144d07ea9..000000000 --- a/cli/polka-storage-provider/src/error.rs +++ /dev/null @@ -1,25 +0,0 @@ -use thiserror::Error; - -use crate::rpc::ClientError; - -/// CLI components error handling implementor. -#[derive(Debug, Error)] -pub enum Error { - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - - #[error("FromEnv error: {0}")] - EnvError(#[from] tracing_subscriber::filter::FromEnvError), - - #[error("URL parse error: {0}")] - ParseUrl(#[from] url::ParseError), - - #[error("Substrate error: {0}")] - Substrate(#[from] subxt::Error), - - #[error(transparent)] - SubstrateCli(#[from] sc_cli::Error), - - #[error("Rpc Client error: {0}")] - RpcClient(#[from] ClientError), -} diff --git a/cli/polka-storage-provider/src/main.rs b/cli/polka-storage-provider/src/main.rs index bc437e3e2..9aa390eec 100644 --- a/cli/polka-storage-provider/src/main.rs +++ b/cli/polka-storage-provider/src/main.rs @@ -3,18 +3,17 @@ mod cli; pub(crate) mod commands; -mod error; mod rpc; mod substrate; pub(crate) use cli::Cli; +use cli::CliError; use commands::runner; -pub(crate) use error::Error; use tracing::level_filters::LevelFilter; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), CliError> { // Logger initialization. tracing_subscriber::registry() .with(fmt::layer()) diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index a79f0eb00..881892734 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -3,4 +3,4 @@ pub mod methods; pub mod server; pub mod version; -pub use client::{Client, ClientError, ClientV0}; +pub use client::{Client, ClientError}; diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs index d6e8dddd9..915853a21 100644 --- a/cli/polka-storage-provider/src/rpc/client.rs +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -18,13 +18,7 @@ use thiserror::Error; use tracing::{debug, instrument}; use url::Url; -use super::{ - methods::RpcRequest, - version::{ApiVersion, V0}, -}; - -/// Type alias for the V0 client instance -pub type ClientV0 = Client; +use super::{methods::RpcRequest, version::ApiVersion}; /// Errors that can occur when working with the client #[derive(Debug, Error)] @@ -64,12 +58,13 @@ impl Client where Version: ApiVersion, { + /// Create a new client instance with the given base URL pub async fn new(base_url: Url) -> Result { let url = base_url.join(Version::version())?; let inner = match url.scheme() { "ws" | "wss" => ClientInner::Ws(WsClientBuilder::new().build(&url).await?), "http" | "https" => ClientInner::Https(HttpClientBuilder::new().build(&url)?), - it => return Err(ClientError::UnsupportedUrlScheme(it.to_string())), + scheme => return Err(ClientError::UnsupportedUrlScheme(scheme.to_string())), }; Ok(Self { @@ -79,6 +74,7 @@ where }) } + /// Execute a JSON-RPC request #[instrument(skip_all, fields(url = %self.url, method = %Request::NAME))] pub async fn execute(&self, request: Request) -> Result where @@ -86,21 +82,21 @@ where Version: ApiVersion, { let method_name = Request::NAME; - let params = serde_json::to_value(request.get_params())?; + let params = serde_json::to_value(request.params())?; let result = match params { Value::Null => self.inner.request(method_name, ArrayParams::new()), - Value::Array(it) => { + Value::Array(arr) => { let mut params = ArrayParams::new(); - for param in it { + for param in arr { params.insert(param)? } self.inner.request(method_name, params) } - Value::Object(it) => { + Value::Object(obj) => { let mut params = ObjectParams::new(); - for (name, param) in it { + for (name, param) in obj { params.insert(&name, param)? } diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index ee5b32f26..ade0a015a 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -15,12 +15,12 @@ pub trait RpcRequest { /// Method name. const NAME: &'static str; /// Successful response type. - type Ok: Debug + Clone + Serialize + DeserializeOwned + 'static; + type Ok: Debug + Serialize + DeserializeOwned + Clone + 'static; /// Parameters type. type Params: Debug + Serialize + DeserializeOwned; /// Get request parameters. - fn get_params(&self) -> Self::Params; + fn params(&self) -> Self::Params; /// A definition of an RPC request handle which can be registered with an /// [`RpcModule`]. This specifies how to handle some specific RPC request. @@ -31,6 +31,11 @@ pub trait RpcRequest { } /// Register the [`RpcRequest`] handle with the [`RpcModule`]. +/// +/// # Panics +/// +/// The function panics when the method name is already registered and if the +/// method name conflicts with the existing registered subscription name. pub fn register_async( module: &mut RpcModule, ) -> &mut jsonrpsee::MethodCallback diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index ac046c9ac..42932c5a1 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -18,7 +18,7 @@ impl RpcRequest for InfoRequest { type Ok = InfoResult; type Params = (); - fn get_params(&self) -> Self::Params { + fn params(&self) -> Self::Params { () } diff --git a/cli/polka-storage-provider/src/rpc/methods/wallet.rs b/cli/polka-storage-provider/src/rpc/methods/wallet.rs index f5425a503..76b085c54 100644 --- a/cli/polka-storage-provider/src/rpc/methods/wallet.rs +++ b/cli/polka-storage-provider/src/rpc/methods/wallet.rs @@ -20,7 +20,7 @@ impl RpcRequest for WalletRequest { type Ok = Option; type Params = (); - fn get_params(&self) -> Self::Params { + fn params(&self) -> Self::Params { () } diff --git a/cli/polka-storage-provider/src/rpc/server.rs b/cli/polka-storage-provider/src/rpc/server.rs index 4ffff1742..ea1213761 100644 --- a/cli/polka-storage-provider/src/rpc/server.rs +++ b/cli/polka-storage-provider/src/rpc/server.rs @@ -19,7 +19,7 @@ use super::{ methods::{common::InfoRequest, register_async, wallet::WalletRequest}, version::V0, }; -use crate::{substrate, Error}; +use crate::{substrate, CliError}; /// Default address to bind the RPC server to. pub const RPC_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; @@ -34,7 +34,7 @@ pub struct RpcServerState { pub async fn start_rpc_server( state: Arc, listen_addr: SocketAddr, -) -> Result { +) -> Result { let server = Server::builder().build(listen_addr).await?; let module = create_module(state.clone()); @@ -43,7 +43,9 @@ pub async fn start_rpc_server( Ok(server_handle) } -/// Initialize [`RpcModule`]. +/// Initialize [`RpcModule`] and register the handlers +/// [`super::methods::RpcRequest::handle`] which are specifying how requests +/// should be processed. pub fn create_module(state: Arc) -> RpcModule { let mut module = RpcModule::from_arc(state); @@ -80,10 +82,12 @@ impl ServerError { } } + /// Construct an error with [`jsonrpsee::types::error::INTERNAL_ERROR_CODE`]. pub fn internal_error(message: impl Display, data: impl Into>) -> Self { Self::new(INTERNAL_ERROR_CODE, message, data) } + /// Construct an error with [`jsonrpsee::types::error::INVALID_PARAMS_CODE`]. pub fn invalid_params(message: impl Display, data: impl Into>) -> Self { Self::new(INVALID_PARAMS_CODE, message, data) } From a6890228a9026396fbb2761f192d692418761e56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 27 Jun 2024 16:54:07 +0200 Subject: [PATCH 22/22] fix: V0 enum --- cli/polka-storage-provider/src/rpc/version.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/polka-storage-provider/src/rpc/version.rs b/cli/polka-storage-provider/src/rpc/version.rs index 5a7644ba1..06218a03e 100644 --- a/cli/polka-storage-provider/src/rpc/version.rs +++ b/cli/polka-storage-provider/src/rpc/version.rs @@ -8,7 +8,7 @@ pub trait ApiVersion { } /// RPC API version v0. -pub struct V0; +pub enum V0 {} #[sealed] impl ApiVersion for V0 {