diff --git a/Cargo.lock b/Cargo.lock index 482825245..cbd618fa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8294,10 +8294,12 @@ dependencies = [ name = "polka-storage-provider" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "clap", "jsonrpsee", "sc-cli", + "sealed", "serde", "serde_json", "subxt", @@ -8307,6 +8309,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -9869,8 +9872,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", - "itertools 0.10.5", + "heck 0.5.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -12250,6 +12253,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "sealed" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "sec1" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 9989246f3..22bc0e144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", t async-channel = "2.3.1" async-stream = "0.3.5" +async-trait = "0.1.80" base64 = "0.22.1" bitflags = "2.5.0" blake2b_simd = { version = "1.0.2" } @@ -67,6 +68,7 @@ quote = { version = "1.0.33" } rand = "0.8.5" rocksdb = { version = "0.21" } scale-info = { version = "2.11.1", default-features = false } +sealed = "0.5" serde = { version = "1.0.197", default-features = false } serde-big-array = { version = "0.3.2" } serde_derive = { version = "1.0.117" } diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index 13d204756..2170a02af 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -8,10 +8,12 @@ repository.workspace = true version = "0.1.0" [dependencies] +async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } -jsonrpsee = { workspace = true, features = ["server"] } +jsonrpsee = { workspace = true, features = ["http-client", "server", "ws-client"] } sc-cli = { workspace = true } +sealed = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } subxt = { workspace = true } @@ -21,6 +23,7 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index f2d6dd5fd..b1f2beeef 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -1,6 +1,11 @@ use clap::Parser; +use thiserror::Error; +use url::Url; -use crate::commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}; +use crate::{ + commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, + rpc::{server::RPC_SERVER_DEFAULT_BIND_ADDR, ClientError}, +}; /// A CLI application that facilitates management operations over a running full /// node and other components. @@ -9,6 +14,10 @@ use crate::commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}; pub(crate) struct Cli { #[command(subcommand)] pub subcommand: SubCommand, + + /// URL of the providers RPC server. + #[arg(long, default_value_t = Url::parse(&format!("http://{RPC_SERVER_DEFAULT_BIND_ADDR}")).unwrap())] + pub rpc_server_url: Url, } /// Supported sub-commands. @@ -24,3 +33,25 @@ pub enum SubCommand { #[command(subcommand)] Wallet(WalletCommand), } + +/// CLI components error handling implementor. +#[derive(Debug, Error)] +pub enum CliError { + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("FromEnv error: {0}")] + EnvError(#[from] tracing_subscriber::filter::FromEnvError), + + #[error("URL parse error: {0}")] + ParseUrl(#[from] url::ParseError), + + #[error("Substrate error: {0}")] + Substrate(#[from] subxt::Error), + + #[error(transparent)] + SubstrateCli(#[from] sc_cli::Error), + + #[error("Rpc Client error: {0}")] + RpcClient(#[from] ClientError), +} diff --git a/cli/polka-storage-provider/src/commands/info.rs b/cli/polka-storage-provider/src/commands/info.rs index 971b14132..38d84169e 100644 --- a/cli/polka-storage-provider/src/commands/info.rs +++ b/cli/polka-storage-provider/src/commands/info.rs @@ -1,15 +1,40 @@ +use std::fmt::{self, Display, Formatter}; + +use chrono::{DateTime, Utc}; use clap::Parser; -use crate::Error; +use crate::{ + cli::CliError, + rpc::{methods::common::InfoRequest, version::V0, Client}, +}; /// Command to display information about the storage provider. #[derive(Debug, Clone, Parser)] pub(crate) struct InfoCommand; impl InfoCommand { - pub async fn run(&self) -> Result<(), Error> { - // TODO(#66,@cernicc,31/05/2024): Print start time of the provider + pub async fn run(&self, client: &Client) -> Result<(), CliError> { // TODO(#67,@cernicc,07/06/2024): Print polkadot address used by the provider - unimplemented!() + + // Get server info + let server_info = client.execute(InfoRequest).await?; + + let node_status_info = NodeStatusInfo { + start_time: server_info.start_time, + }; + + println!("{}", node_status_info); + + Ok(()) + } +} + +struct NodeStatusInfo { + start_time: DateTime, +} + +impl Display for NodeStatusInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + writeln!(f, "Started at: {}", self.start_time) } } diff --git a/cli/polka-storage-provider/src/commands/init.rs b/cli/polka-storage-provider/src/commands/init.rs index 248aa293b..083c17090 100644 --- a/cli/polka-storage-provider/src/commands/init.rs +++ b/cli/polka-storage-provider/src/commands/init.rs @@ -1,14 +1,14 @@ use clap::Parser; use tracing::info; -use crate::Error; +use crate::cli::CliError; /// Command to initialize the storage provider. #[derive(Debug, Clone, Parser)] pub(crate) struct InitCommand; impl InitCommand { - pub async fn run(&self) -> Result<(), Error> { + pub async fn run(&self) -> Result<(), CliError> { info!("Initializing polka storage provider..."); // TODO(#64,@cernicc,31/05/2024): Init needed configurations. // TODO(#65,@cernicc,31/05/2024): Check if full node is synced diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index a27e5159f..5f31a656b 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -6,27 +6,27 @@ use tracing::info; use url::Url; use crate::{ - rpc::{start_rpc, RpcServerState}, - substrate, Error, + cli::CliError, + rpc::server::{start_rpc_server, RpcServerState, RPC_SERVER_DEFAULT_BIND_ADDR}, + substrate, }; -const SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; /// Command to start the storage provider. #[derive(Debug, Clone, Parser)] pub(crate) struct RunCommand { /// RPC API endpoint used by the parachain node. - #[arg(short = 'n', long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] - pub node_rpc_address: Url, - /// Address used for RPC. By default binds on localhost on port 8000. - #[arg(short = 'a', long, default_value = SERVER_DEFAULT_BIND_ADDR)] + #[arg(long, default_value = FULL_NODE_DEFAULT_RPC_ADDR)] + pub rpc_address: Url, + /// Address and port used for RPC server. + #[arg(long, default_value = RPC_SERVER_DEFAULT_BIND_ADDR)] pub listen_addr: SocketAddr, } impl RunCommand { - pub async fn run(&self) -> Result<(), Error> { - let substrate_client = substrate::init_client(self.node_rpc_address.as_str()).await?; + pub async fn run(&self) -> Result<(), CliError> { + let substrate_client = substrate::init_client(self.rpc_address.as_str()).await?; let state = Arc::new(RpcServerState { start_time: Utc::now(), @@ -34,7 +34,7 @@ impl RunCommand { }); // Start RPC server - let handle = start_rpc(state, self.listen_addr).await?; + let handle = start_rpc_server(state, self.listen_addr).await?; info!("RPC server started at {}", self.listen_addr); // Monitor shutdown diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index 6c9ec560d..408e212d1 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -1,18 +1,25 @@ use clap::Parser; use super::WalletCommand; -use crate::{cli::SubCommand, Cli, Error}; +use crate::{ + cli::{CliError, SubCommand}, + rpc::Client, + Cli, +}; -/// Parses command line arguments into the service configuration and runs the specified -/// command with it. -pub(crate) async fn run() -> Result<(), Error> { +/// Parses command line arguments into the service configuration and runs the +/// specified command with it. +pub(crate) async fn run() -> Result<(), CliError> { // CLI arguments parsed and mapped to the struct. let cli_arguments: Cli = Cli::parse(); + // RPC client used to interact with the full node + let rpc_client = Client::new(cli_arguments.rpc_server_url).await?; + match &cli_arguments.subcommand { SubCommand::Init(cmd) => cmd.run().await, SubCommand::Run(cmd) => cmd.run().await, - SubCommand::Info(cmd) => cmd.run().await, + SubCommand::Info(cmd) => cmd.run(&rpc_client).await, SubCommand::Wallet(cmd) => match cmd { WalletCommand::GenerateNodeKey(cmd) => Ok(cmd.run()?), WalletCommand::Generate(cmd) => Ok(cmd.run()?), diff --git a/cli/polka-storage-provider/src/error.rs b/cli/polka-storage-provider/src/error.rs deleted file mode 100644 index 738d1f4e6..000000000 --- a/cli/polka-storage-provider/src/error.rs +++ /dev/null @@ -1,17 +0,0 @@ -use thiserror::Error; - -/// CLI components error handling implementor. -#[derive(Debug, Error)] -pub enum Error { - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - - #[error("FromEnv error: {0}")] - EnvError(#[from] tracing_subscriber::filter::FromEnvError), - - #[error("Substrate error: {0}")] - Substrate(#[from] subxt::Error), - - #[error(transparent)] - SubstrateCli(#[from] sc_cli::Error), -} diff --git a/cli/polka-storage-provider/src/main.rs b/cli/polka-storage-provider/src/main.rs index bc437e3e2..9aa390eec 100644 --- a/cli/polka-storage-provider/src/main.rs +++ b/cli/polka-storage-provider/src/main.rs @@ -3,18 +3,17 @@ mod cli; pub(crate) mod commands; -mod error; mod rpc; mod substrate; pub(crate) use cli::Cli; +use cli::CliError; use commands::runner; -pub(crate) use error::Error; use tracing::level_filters::LevelFilter; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), CliError> { // Logger initialization. tracing_subscriber::registry() .with(fmt::layer()) diff --git a/cli/polka-storage-provider/src/rpc.rs b/cli/polka-storage-provider/src/rpc.rs index a3a826f0b..881892734 100644 --- a/cli/polka-storage-provider/src/rpc.rs +++ b/cli/polka-storage-provider/src/rpc.rs @@ -1,71 +1,6 @@ -use std::{future::Future, net::SocketAddr, sync::Arc}; - -use chrono::Utc; -use error::ServerError; -use jsonrpsee::{ - server::{Server, ServerHandle}, - types::Params, - RpcModule, -}; -use methods::create_module; -use serde::{Deserialize, Serialize}; - -use crate::{substrate, Error}; - -pub mod error; +mod client; pub mod methods; +pub mod server; +pub mod version; -/// A definition of an RPC method handler which can be registered with an [`RpcModule`]. -pub trait RpcMethod { - /// Method name. - const NAME: &'static str; - /// See [`ApiVersion`]. - const API_VERSION: ApiVersion; - /// Successful response type. - type Ok: Serialize; - - /// Logic for this method. - fn handle( - ctx: Arc, - params: Params, - ) -> impl Future> + Send; - - /// Register this method with an [`RpcModule`]. - fn register_async(module: &mut RpcModule) -> &mut jsonrpsee::MethodCallback - where - Self::Ok: Clone + 'static, - { - module - .register_async_method(Self::NAME, move |params, ctx| async move { - let ok = Self::handle(ctx, params).await?; - Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok) - }) - .expect("method should be valid") // This is safe because we know the method registered is valid. - } -} - -/// Available API versions. -/// -/// These are significant because they are expressed in the URL path against -/// which RPC calls are made, e.g `rpc/v0` or `rpc/v1`. -#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] -pub enum ApiVersion { - V0, -} - -pub struct RpcServerState { - pub start_time: chrono::DateTime, - pub substrate_client: substrate::Client, -} - -pub async fn start_rpc( - state: Arc, - listen_addr: SocketAddr, -) -> Result { - let server = Server::builder().build(listen_addr).await?; - - let module = create_module(state.clone()); - let server_handle = server.start(module); - - Ok(server_handle) -} +pub use client::{Client, ClientError}; diff --git a/cli/polka-storage-provider/src/rpc/client.rs b/cli/polka-storage-provider/src/rpc/client.rs new file mode 100644 index 000000000..915853a21 --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/client.rs @@ -0,0 +1,205 @@ +use std::{ + fmt::{self, Debug}, + marker::PhantomData, +}; + +use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT, Subscription, SubscriptionClientT}, + params::{ArrayParams, BatchRequestBuilder, ObjectParams}, + traits::ToRpcParams, + }, + http_client::HttpClientBuilder, + ws_client::WsClientBuilder, +}; +use serde::de::DeserializeOwned; +use serde_json::Value; +use thiserror::Error; +use tracing::{debug, instrument}; +use url::Url; + +use super::{methods::RpcRequest, version::ApiVersion}; + +/// Errors that can occur when working with the client +#[derive(Debug, Error)] +pub enum ClientError { + #[error("Unsupported scheme error: {0}")] + UnsupportedUrlScheme(String), + + #[error("Invalid parameter type: {0}")] + InvalidParameter(Value), + + #[error(transparent)] + Url(#[from] url::ParseError), + + #[error(transparent)] + JsonRpcClient(#[from] jsonrpsee::core::ClientError), + + #[error(transparent)] + Json(#[from] serde_json::Error), +} + +/// Represents a single connection to the URL server +pub struct Client { + url: Url, + inner: ClientInner, + _version: PhantomData, +} + +impl Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InnerClient") + .field("url", &self.url) + .finish_non_exhaustive() + } +} + +impl Client +where + Version: ApiVersion, +{ + /// Create a new client instance with the given base URL + pub async fn new(base_url: Url) -> Result { + let url = base_url.join(Version::version())?; + let inner = match url.scheme() { + "ws" | "wss" => ClientInner::Ws(WsClientBuilder::new().build(&url).await?), + "http" | "https" => ClientInner::Https(HttpClientBuilder::new().build(&url)?), + scheme => return Err(ClientError::UnsupportedUrlScheme(scheme.to_string())), + }; + + Ok(Self { + url, + inner, + _version: PhantomData, + }) + } + + /// Execute a JSON-RPC request + #[instrument(skip_all, fields(url = %self.url, method = %Request::NAME))] + pub async fn execute(&self, request: Request) -> Result + where + Request: RpcRequest, + Version: ApiVersion, + { + let method_name = Request::NAME; + let params = serde_json::to_value(request.params())?; + + let result = match params { + Value::Null => self.inner.request(method_name, ArrayParams::new()), + Value::Array(arr) => { + let mut params = ArrayParams::new(); + for param in arr { + params.insert(param)? + } + + self.inner.request(method_name, params) + } + Value::Object(obj) => { + let mut params = ObjectParams::new(); + for (name, param) in obj { + params.insert(&name, param)? + } + + self.inner.request(method_name, params) + } + param @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => { + return Err(ClientError::InvalidParameter(param)) + } + } + .await; + + debug!(?result, "response received"); + + // We cant return result directly because compiler needs some help to + // understand the types + Ok(result?) + } +} + +enum ClientInner { + Ws(jsonrpsee::ws_client::WsClient), + Https(jsonrpsee::http_client::HttpClient), +} + +#[async_trait::async_trait] +impl ClientT for ClientInner { + async fn notification( + &self, + method: &str, + params: Params, + ) -> Result<(), jsonrpsee::core::ClientError> + where + Params: ToRpcParams + Send, + { + match &self { + ClientInner::Ws(client) => client.notification(method, params).await, + ClientInner::Https(client) => client.notification(method, params).await, + } + } + + async fn request( + &self, + method: &str, + params: Params, + ) -> Result + where + R: DeserializeOwned, + Params: ToRpcParams + Send, + { + match &self { + ClientInner::Ws(client) => client.request(method, params).await, + ClientInner::Https(client) => client.request(method, params).await, + } + } + + async fn batch_request<'a, R>( + &self, + batch: BatchRequestBuilder<'a>, + ) -> Result, jsonrpsee::core::ClientError> + where + R: DeserializeOwned + fmt::Debug + 'a, + { + match &self { + ClientInner::Ws(client) => client.batch_request(batch).await, + ClientInner::Https(client) => client.batch_request(batch).await, + } + } +} + +#[async_trait::async_trait] +impl SubscriptionClientT for ClientInner { + async fn subscribe<'a, Notif, Params>( + &self, + subscribe_method: &'a str, + params: Params, + unsubscribe_method: &'a str, + ) -> Result, jsonrpsee::core::ClientError> + where + Params: ToRpcParams + Send, + Notif: DeserializeOwned, + { + match &self { + ClientInner::Ws(it) => { + it.subscribe(subscribe_method, params, unsubscribe_method) + .await + } + ClientInner::Https(it) => { + it.subscribe(subscribe_method, params, unsubscribe_method) + .await + } + } + } + + async fn subscribe_to_method<'a, Notif>( + &self, + method: &'a str, + ) -> Result, jsonrpsee::core::ClientError> + where + Notif: DeserializeOwned, + { + match &self { + ClientInner::Ws(it) => it.subscribe_to_method(method).await, + ClientInner::Https(it) => it.subscribe_to_method(method).await, + } + } +} diff --git a/cli/polka-storage-provider/src/rpc/error.rs b/cli/polka-storage-provider/src/rpc/error.rs deleted file mode 100644 index c90aa5e0b..000000000 --- a/cli/polka-storage-provider/src/rpc/error.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::fmt::Display; - -use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObjectOwned}; -use serde_json::Value; - -/// Error type for RPC server errors. -#[derive(Debug)] -pub struct ServerError { - inner: ErrorObjectOwned, -} - -impl std::error::Error for ServerError {} - -impl Display for ServerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "JSON-RPC error: {}", &self.inner) - } -} - -impl From for ErrorObjectOwned { - fn from(err: ServerError) -> Self { - err.inner - } -} - -impl ServerError { - pub fn new(code: i32, message: impl Display, data: impl Into>) -> Self { - Self { - inner: ErrorObjectOwned::owned(code, message.to_string(), data.into()), - } - } - - pub fn internal_error(message: impl Display, data: impl Into>) -> Self { - Self::new(INTERNAL_ERROR_CODE, message, data) - } -} - -impl From for ServerError { - fn from(err: subxt::Error) -> Self { - Self::internal_error(err, None) - } -} diff --git a/cli/polka-storage-provider/src/rpc/methods.rs b/cli/polka-storage-provider/src/rpc/methods.rs index e62f231b1..ade0a015a 100644 --- a/cli/polka-storage-provider/src/rpc/methods.rs +++ b/cli/polka-storage-provider/src/rpc/methods.rs @@ -1,17 +1,70 @@ -use std::sync::Arc; +use std::{fmt::Debug, future::Future, sync::Arc}; use jsonrpsee::RpcModule; +use serde::{de::DeserializeOwned, Serialize}; +use tracing::{debug, debug_span, error, Instrument}; +use uuid::Uuid; -use super::{RpcMethod, RpcServerState}; +use super::server::{RpcServerState, ServerError}; pub mod common; pub mod wallet; -pub fn create_module(state: Arc) -> RpcModule { - let mut module = RpcModule::from_arc(state); +/// A trait for defining a versioned RPC request. +pub trait RpcRequest { + /// Method name. + const NAME: &'static str; + /// Successful response type. + type Ok: Debug + Serialize + DeserializeOwned + Clone + 'static; + /// Parameters type. + type Params: Debug + Serialize + DeserializeOwned; - common::Info::register_async(&mut module); - wallet::WalletBalance::register_async(&mut module); + /// Get request parameters. + fn params(&self) -> Self::Params; + /// A definition of an RPC request handle which can be registered with an + /// [`RpcModule`]. This specifies how to handle some specific RPC request. + fn handle( + ctx: Arc, + params: Self::Params, + ) -> impl Future> + Send; +} + +/// Register the [`RpcRequest`] handle with the [`RpcModule`]. +/// +/// # Panics +/// +/// The function panics when the method name is already registered and if the +/// method name conflicts with the existing registered subscription name. +pub fn register_async( + module: &mut RpcModule, +) -> &mut jsonrpsee::MethodCallback +where + Request: RpcRequest, +{ module + .register_async_method(Request::NAME, move |params, ctx| async move { + // Try to deserialize the params + let span = + debug_span!("method", id = %Uuid::new_v4(), method = Request::NAME, params = ?params); + let params = params.parse().map_err(|err| { + error!(parent: span.clone(), ?err, ?params, "failed to parse params"); + ServerError::invalid_params("Failed to parse params", None) + })?; + + // Handle the method + let result = Request::handle(ctx, params).instrument(span.clone()).await; + + match &result { + Ok(ok) => { + debug!(parent: span, response = ?ok, "handled successfully"); + } + Err(err) => { + error!(parent: span, err = ?err, "error ocurred while handling") + } + } + + Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(result?) + }) + .expect("method should be valid") // This is safe because we know the method registered is valid. } diff --git a/cli/polka-storage-provider/src/rpc/methods/common.rs b/cli/polka-storage-provider/src/rpc/methods/common.rs index 6232af8cf..42932c5a1 100644 --- a/cli/polka-storage-provider/src/rpc/methods/common.rs +++ b/cli/polka-storage-provider/src/rpc/methods/common.rs @@ -1,26 +1,28 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; -use jsonrpsee::types::Params; use serde::{Deserialize, Serialize}; -use crate::rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}; +use super::RpcRequest; +use crate::rpc::{ + server::{RpcServerState, ServerError}, + version::V0, +}; /// This RPC method exposes the system information. #[derive(Debug)] -pub struct Info; +pub struct InfoRequest; -impl RpcMethod for Info { +impl RpcRequest for InfoRequest { const NAME: &'static str = "info"; - - const API_VERSION: ApiVersion = ApiVersion::V0; - type Ok = InfoResult; + type Params = (); + + fn params(&self) -> Self::Params { + () + } - async fn handle( - ctx: Arc, - _params: Params<'_>, - ) -> Result { + async fn handle(ctx: Arc, _: Self::Params) -> Result { Ok(InfoResult { start_time: ctx.start_time, }) diff --git a/cli/polka-storage-provider/src/rpc/methods/wallet.rs b/cli/polka-storage-provider/src/rpc/methods/wallet.rs index e5d93fd2f..76b085c54 100644 --- a/cli/polka-storage-provider/src/rpc/methods/wallet.rs +++ b/cli/polka-storage-provider/src/rpc/methods/wallet.rs @@ -1,27 +1,30 @@ use std::sync::Arc; -use jsonrpsee::types::Params; use serde::{Deserialize, Serialize}; use subxt_signer::sr25519::dev; +use super::RpcRequest; use crate::{ - rpc::{error::ServerError, ApiVersion, RpcMethod, RpcServerState}, + rpc::{ + server::{RpcServerState, ServerError}, + version::V0, + }, substrate::get_system_balances, }; /// This RPC method exposes getting the system balances for the particular /// account. -pub struct WalletBalance; -impl RpcMethod for WalletBalance { +pub struct WalletRequest; +impl RpcRequest for WalletRequest { const NAME: &'static str = "wallet_balance"; - const API_VERSION: ApiVersion = ApiVersion::V0; - type Ok = Option; + type Params = (); + + fn params(&self) -> Self::Params { + () + } - async fn handle( - ctx: Arc, - _params: Params<'_>, - ) -> Result { + async fn handle(ctx: Arc, _: Self::Params) -> Result { // TODO(#68,@cernicc,05/06/2024): Implement correctly. dev alice is used as a show case for now. let account = dev::alice().public_key().into(); let balance = get_system_balances(&ctx.substrate_client, &account).await?; @@ -34,6 +37,7 @@ impl RpcMethod for WalletBalance { } } +/// Result of the wallet balance request. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalletBalanceResult { free: String, diff --git a/cli/polka-storage-provider/src/rpc/server.rs b/cli/polka-storage-provider/src/rpc/server.rs new file mode 100644 index 000000000..ea1213761 --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/server.rs @@ -0,0 +1,100 @@ +use std::{ + fmt::{Debug, Display}, + net::SocketAddr, + sync::Arc, +}; + +use chrono::Utc; +use jsonrpsee::{ + server::{Server, ServerHandle}, + types::{ + error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, + ErrorObjectOwned, + }, + RpcModule, +}; +use serde_json::Value; + +use super::{ + methods::{common::InfoRequest, register_async, wallet::WalletRequest}, + version::V0, +}; +use crate::{substrate, CliError}; + +/// Default address to bind the RPC server to. +pub const RPC_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:8000"; + +/// RPC server shared state. +pub struct RpcServerState { + pub start_time: chrono::DateTime, + pub substrate_client: substrate::Client, +} + +/// Start the RPC server. +pub async fn start_rpc_server( + state: Arc, + listen_addr: SocketAddr, +) -> Result { + let server = Server::builder().build(listen_addr).await?; + + let module = create_module(state.clone()); + let server_handle = server.start(module); + + Ok(server_handle) +} + +/// Initialize [`RpcModule`] and register the handlers +/// [`super::methods::RpcRequest::handle`] which are specifying how requests +/// should be processed. +pub fn create_module(state: Arc) -> RpcModule { + let mut module = RpcModule::from_arc(state); + + register_async::(&mut module); + register_async::(&mut module); + + module +} + +/// Error type for RPC server errors. +#[derive(Debug)] +pub struct ServerError { + inner: ErrorObjectOwned, +} + +impl std::error::Error for ServerError {} + +impl Display for ServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "JSON-RPC error: {}", &self.inner) + } +} + +impl From for ErrorObjectOwned { + fn from(err: ServerError) -> Self { + err.inner + } +} + +impl ServerError { + pub fn new(code: i32, message: impl Display, data: impl Into>) -> Self { + Self { + inner: ErrorObjectOwned::owned(code, message.to_string(), data.into()), + } + } + + /// Construct an error with [`jsonrpsee::types::error::INTERNAL_ERROR_CODE`]. + pub fn internal_error(message: impl Display, data: impl Into>) -> Self { + Self::new(INTERNAL_ERROR_CODE, message, data) + } + + /// Construct an error with [`jsonrpsee::types::error::INVALID_PARAMS_CODE`]. + pub fn invalid_params(message: impl Display, data: impl Into>) -> Self { + Self::new(INVALID_PARAMS_CODE, message, data) + } +} + +impl From for ServerError { + fn from(err: subxt::Error) -> Self { + Self::internal_error(err, None) + } +} diff --git a/cli/polka-storage-provider/src/rpc/version.rs b/cli/polka-storage-provider/src/rpc/version.rs new file mode 100644 index 000000000..06218a03e --- /dev/null +++ b/cli/polka-storage-provider/src/rpc/version.rs @@ -0,0 +1,18 @@ +use sealed::sealed; + +/// RPC API version. +#[sealed] +pub trait ApiVersion { + /// Returns the version string. + fn version() -> &'static str; +} + +/// RPC API version v0. +pub enum V0 {} + +#[sealed] +impl ApiVersion for V0 { + fn version() -> &'static str { + "rpc/v0" + } +}