diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3c7a8ca1b..2db762e86 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,3 @@ # Any change in the repo requests review from the dev team -* @cernicc @th7nder @serg-temchenko @jmg-duarte @aidan46 + +- @cernicc @th7nder @jmg-duarte @aidan46 diff --git a/Cargo.lock b/Cargo.lock index da83b1939..91d29f0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -837,6 +837,75 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "multer", + "percent-encoding", + "pin-project-lite 0.2.14", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite 0.2.14", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00c055ee2d014ae5981ce1016374e8213682aa14d9bf40e48ab48b5f3ef20eaa" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "backtrace" version = "0.3.72" @@ -3159,6 +3228,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -4197,7 +4275,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -4385,6 +4463,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -4392,7 +4481,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite 0.2.14", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite 0.2.14", ] @@ -4431,8 +4543,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -4444,6 +4556,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite 0.2.14", + "smallvec", + "tokio", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -4451,8 +4582,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.28", "log", "rustls 0.21.12", "rustls-native-certs 0.6.3", @@ -4460,6 +4591,21 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite 0.2.14", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -4826,7 +4972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4978087a58c3ab02efc5b07c5e5e2803024536106fd5506f558db172c889b3aa" dependencies = [ "futures-util", - "http", + "http 0.2.12", "jsonrpsee-core", "pin-project", "rustls-native-certs 0.7.0", @@ -4851,7 +4997,7 @@ dependencies = [ "beef", "futures-timer", "futures-util", - "hyper", + "hyper 0.14.28", "jsonrpsee-types", "parking_lot 0.12.3", "pin-project", @@ -4872,7 +5018,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ccf93fc4a0bfe05d851d37d7c32b7f370fe94336b52a2f0efc5f1981895c2e5" dependencies = [ "async-trait", - "hyper", + "hyper 0.14.28", "hyper-rustls", "jsonrpsee-core", "jsonrpsee-types", @@ -4905,8 +5051,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12d8b6a9674422a8572e0b0abb12feeb3f2aeda86528c80d0350c2bd0923ab41" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.28", "jsonrpsee-core", "jsonrpsee-types", "pin-project", @@ -4941,7 +5087,7 @@ version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58b9db2dfd5bb1194b0ce921504df9ceae210a345bc2f6c5a61432089bbab070" dependencies = [ - "http", + "http 0.2.12", "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", @@ -5799,6 +5945,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "mater" version = "0.1.0" @@ -5910,6 +6062,12 @@ dependencies = [ "thrift", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -6049,6 +6207,23 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multiaddr" version = "0.17.1" @@ -8315,17 +8490,23 @@ name = "polka-storage-provider" version = "0.1.0" dependencies = [ "async-trait", + "axum", "chrono", "clap", + "futures", "jsonrpsee", + "mater", "sc-cli", "sealed", "serde", "serde_json", "subxt", "subxt-signer", + "tempfile", "thiserror", "tokio", + "tokio-util", + "tower-http 0.5.2", "tracing", "tracing-subscriber", "url", @@ -11660,7 +11841,7 @@ dependencies = [ "fnv", "futures", "futures-timer", - "hyper", + "hyper 0.14.28", "hyper-rustls", "libp2p", "log", @@ -11754,8 +11935,8 @@ dependencies = [ "forwarded-header-value", "futures", "governor", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.28", "ip_network", "jsonrpsee", "log", @@ -11763,7 +11944,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tokio", "tower", - "tower-http", + "tower-http 0.4.4", ] [[package]] @@ -12436,6 +12617,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.6" @@ -12445,6 +12636,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serdect" version = "0.2.0" @@ -12924,7 +13127,7 @@ dependencies = [ "bytes", "flate2", "futures", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", @@ -14091,7 +14294,7 @@ name = "substrate-prometheus-endpoint" version = "0.17.0" source = "git+https://github.com/paritytech/polkadot-sdk?tag=polkadot-v1.13.0#d5160c1d567cc73c7df6c816d41e21aa3adb188d" dependencies = [ - "hyper", + "hyper 0.14.28", "log", "prometheus", "thiserror", @@ -14325,6 +14528,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.12.6" @@ -14738,6 +14953,7 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite 0.2.14", + "tokio", "tower-layer", "tower-service", "tracing", @@ -14753,14 +14969,31 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "http-range-header", "pin-project-lite 0.2.14", "tower-layer", "tower-service", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.5.0", + "bytes", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "pin-project-lite 0.2.14", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -15003,7 +15236,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 2cb378633..1967152d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", t async-channel = "2.3.1" async-stream = "0.3.5" async-trait = "0.1.80" +axum = "0.7.5" base64 = "0.22.1" bitflags = "2.5.0" blake2b_simd = { version = "1.0.2" } @@ -86,6 +87,7 @@ thiserror = { version = "1.0.48" } tokio = "1.37.0" tokio-stream = "0.1.15" tokio-util = "0.7.11" +tower-http = "0.5.2" tracing = "0.1.40" tracing-subscriber = "0.3.18" url = "2.5.0" @@ -94,6 +96,7 @@ uuid = "1.8.0" # Local cli-primitives = { path = "primitives/cli" } +mater = { path = "storage/mater" } pallet-market = { path = "pallets/market", default-features = false } pallet-storage-provider = { path = "pallets/storage-provider", default-features = false } polka-storage-runtime = { path = "runtime" } diff --git a/cli/polka-storage-provider/Cargo.toml b/cli/polka-storage-provider/Cargo.toml index 2170a02af..a9ffce5ca 100644 --- a/cli/polka-storage-provider/Cargo.toml +++ b/cli/polka-storage-provider/Cargo.toml @@ -9,21 +9,27 @@ version = "0.1.0" [dependencies] async-trait = { workspace = true } +axum = { workspace = true, features = ["macros", "multipart"] } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } +futures = { workspace = true } jsonrpsee = { workspace = true, features = ["http-client", "server", "ws-client"] } +mater = { workspace = true } sc-cli = { workspace = true } sealed = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } subxt = { workspace = true } subxt-signer = { workspace = true } +tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } +tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } -uuid = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [lints] workspace = true diff --git a/cli/polka-storage-provider/src/cli.rs b/cli/polka-storage-provider/src/cli.rs index b1f2beeef..0af871ecf 100644 --- a/cli/polka-storage-provider/src/cli.rs +++ b/cli/polka-storage-provider/src/cli.rs @@ -3,7 +3,7 @@ use thiserror::Error; use url::Url; use crate::{ - commands::{InfoCommand, InitCommand, RunCommand, WalletCommand}, + commands::{InfoCommand, InitCommand, RunCommand, StorageCommand, WalletCommand}, rpc::{server::RPC_SERVER_DEFAULT_BIND_ADDR, ClientError}, }; @@ -27,6 +27,8 @@ pub enum SubCommand { Init(InitCommand), /// Start a polka storage provider Run(RunCommand), + /// Command to start storage server. + Storage(StorageCommand), /// Info command to display information about the storage provider. Info(InfoCommand), /// Command to manage wallet operations. diff --git a/cli/polka-storage-provider/src/commands.rs b/cli/polka-storage-provider/src/commands.rs index b3724cc1c..4e8c8b336 100644 --- a/cli/polka-storage-provider/src/commands.rs +++ b/cli/polka-storage-provider/src/commands.rs @@ -1,6 +1,7 @@ mod info; mod init; mod run; +mod storage; mod wallet; pub(crate) mod runner; @@ -8,4 +9,5 @@ pub(crate) mod runner; pub(crate) use info::InfoCommand; pub(crate) use init::InitCommand; pub(crate) use run::RunCommand; +pub(crate) use storage::StorageCommand; pub(crate) use wallet::WalletCommand; diff --git a/cli/polka-storage-provider/src/commands/run.rs b/cli/polka-storage-provider/src/commands/run.rs index 5f31a656b..baf52df92 100644 --- a/cli/polka-storage-provider/src/commands/run.rs +++ b/cli/polka-storage-provider/src/commands/run.rs @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc}; use chrono::Utc; use clap::Parser; +use tokio::{signal, sync::oneshot}; use tracing::info; use url::Url; @@ -11,6 +12,7 @@ use crate::{ substrate, }; +/// Default RPC API endpoint used by the parachain node. const FULL_NODE_DEFAULT_RPC_ADDR: &str = "ws://127.0.0.1:9944"; /// Command to start the storage provider. @@ -33,20 +35,26 @@ impl RunCommand { substrate_client, }); - // Start RPC server - let handle = start_rpc_server(state, self.listen_addr).await?; - info!("RPC server started at {}", self.listen_addr); + // Setup shutdown channel + let (notify_shutdown_tx, notify_shutdown_rx) = oneshot::channel(); - // Monitor shutdown - tokio::signal::ctrl_c().await?; + // Start the server in the background + let rpc_handler = tokio::spawn(start_rpc_server( + state.clone(), + self.listen_addr, + notify_shutdown_rx, + )); - // Stop the Server - let _ = handle.stop(); + // Wait for SIGTERM on the main thread and once received "unblock" + signal::ctrl_c().await.expect("failed to listen for event"); + // Send the shutdown signal + let _ = notify_shutdown_tx.send(()); - // Wait for the server to stop - handle.stopped().await; - info!("RPC server stopped"); + // Give server some time to finish + info!("shutting down server, killing it in 10sec"); + let _ = tokio::time::timeout(std::time::Duration::from_secs(10), rpc_handler).await; + info!("storage provider stopped"); Ok(()) } } diff --git a/cli/polka-storage-provider/src/commands/runner.rs b/cli/polka-storage-provider/src/commands/runner.rs index 408e212d1..831d9832b 100644 --- a/cli/polka-storage-provider/src/commands/runner.rs +++ b/cli/polka-storage-provider/src/commands/runner.rs @@ -19,6 +19,7 @@ pub(crate) async fn run() -> Result<(), CliError> { match &cli_arguments.subcommand { SubCommand::Init(cmd) => cmd.run().await, SubCommand::Run(cmd) => cmd.run().await, + SubCommand::Storage(cmd) => cmd.run().await, SubCommand::Info(cmd) => cmd.run(&rpc_client).await, SubCommand::Wallet(cmd) => match cmd { WalletCommand::GenerateNodeKey(cmd) => Ok(cmd.run()?), diff --git a/cli/polka-storage-provider/src/commands/storage.rs b/cli/polka-storage-provider/src/commands/storage.rs new file mode 100644 index 000000000..b71130010 --- /dev/null +++ b/cli/polka-storage-provider/src/commands/storage.rs @@ -0,0 +1,61 @@ +use std::{env, net::SocketAddr, path::PathBuf, sync::Arc}; + +use clap::Parser; +use tokio::{signal, sync::oneshot}; +use tracing::info; + +use crate::{ + cli::CliError, + storage::{start_upload_server, StorageServerState}, +}; + +/// Creates a path relative to the current directory in the format `./uploads` +fn default_storage_path() -> PathBuf { + let mut current_dir = env::current_dir().expect("failed to get current directory"); + current_dir.push("uploads"); + current_dir +} + +/// Default address to bind the storage server to. +pub const STORAGE_SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:9000"; + +/// Command to start the storage provider. +#[derive(Debug, Clone, Parser)] +pub(crate) struct StorageCommand { + /// Address and port used for storage server. + #[arg(long, default_value = STORAGE_SERVER_DEFAULT_BIND_ADDR)] + pub listen_addr: SocketAddr, + /// Directory where uploaded files are stored. + #[arg(long, default_value = default_storage_path().into_os_string())] + pub storage_dir: PathBuf, +} + +impl StorageCommand { + pub async fn run(&self) -> Result<(), CliError> { + let state = Arc::new(StorageServerState { + storage_dir: self.storage_dir.clone(), + }); + + // Setup shutdown channel + let (notify_shutdown_tx, notify_shutdown_rx) = oneshot::channel(); + + // Start the server in the background + let upload_handler = tokio::spawn(start_upload_server( + state.clone(), + self.listen_addr, + notify_shutdown_rx, + )); + + // Wait for SIGTERM on the main thread and once received "unblock" + signal::ctrl_c().await.expect("failed to listen for event"); + // Send the shutdown signal + let _ = notify_shutdown_tx.send(()); + + // Give uploads some time to finish + info!("shutting down server, killing it in 30sec"); + let _ = tokio::time::timeout(std::time::Duration::from_secs(30), upload_handler).await; + + info!("storage provider server stopped"); + Ok(()) + } +} diff --git a/cli/polka-storage-provider/src/main.rs b/cli/polka-storage-provider/src/main.rs index 9aa390eec..5c4c8b502 100644 --- a/cli/polka-storage-provider/src/main.rs +++ b/cli/polka-storage-provider/src/main.rs @@ -1,9 +1,11 @@ //! A CLI application that facilitates management operations over a running full node and other components. #![deny(unused_crate_dependencies)] +#![deny(clippy::unwrap_used)] mod cli; pub(crate) mod commands; mod rpc; +mod storage; mod substrate; pub(crate) use cli::Cli; diff --git a/cli/polka-storage-provider/src/rpc/server.rs b/cli/polka-storage-provider/src/rpc/server.rs index ea1213761..dde9c62e9 100644 --- a/cli/polka-storage-provider/src/rpc/server.rs +++ b/cli/polka-storage-provider/src/rpc/server.rs @@ -6,7 +6,7 @@ use std::{ use chrono::Utc; use jsonrpsee::{ - server::{Server, ServerHandle}, + server::Server, types::{ error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, ErrorObjectOwned, @@ -14,6 +14,8 @@ use jsonrpsee::{ RpcModule, }; use serde_json::Value; +use tokio::sync::oneshot::Receiver; +use tracing::{info, instrument}; use super::{ methods::{common::InfoRequest, register_async, wallet::WalletRequest}, @@ -31,16 +33,30 @@ pub struct RpcServerState { } /// Start the RPC server. +#[instrument(skip_all)] pub async fn start_rpc_server( state: Arc, listen_addr: SocketAddr, -) -> Result { + notify_shutdown_rx: Receiver<()>, +) -> Result<(), CliError> { let server = Server::builder().build(listen_addr).await?; - let module = create_module(state.clone()); + let module = create_module(state); let server_handle = server.start(module); + info!("RPC server started at {}", listen_addr); - Ok(server_handle) + // Wait for shutdown signal. No need to handle the error. We stop the server + // in any case. + let _ = notify_shutdown_rx.await; + + // Stop returns an error if the server has already been stopped. + // PRE-COND: the server is only shutdown by receiving from `notify_shutdown_rx` + let _ = server_handle.stop(); + + // Wait for server to be stopped + server_handle.stopped().await; + + Ok(()) } /// Initialize [`RpcModule`] and register the handlers diff --git a/cli/polka-storage-provider/src/storage.rs b/cli/polka-storage-provider/src/storage.rs new file mode 100644 index 000000000..2b55124ce --- /dev/null +++ b/cli/polka-storage-provider/src/storage.rs @@ -0,0 +1,188 @@ +use std::{io, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc}; + +use axum::{ + body::Body, + extract::{MatchedPath, Path, Request, State}, + http::{header, StatusCode}, + response::{IntoResponse, Response}, + routing::{get, post}, + Router, +}; +use futures::TryStreamExt; +use mater::{create_filestore, Cid, Config}; +use tempfile::tempdir_in; +use tokio::{ + fs::{self, File}, + io::{AsyncRead, BufWriter}, + sync::oneshot::Receiver, +}; +use tokio_util::io::{ReaderStream, StreamReader}; +use tower_http::trace::TraceLayer; +use tracing::{error, info, info_span, instrument}; +use uuid::Uuid; + +use crate::cli::CliError; + +/// Shared state of the storage server. +pub struct StorageServerState { + pub storage_dir: PathBuf, +} + +#[instrument(skip_all)] +pub async fn start_upload_server( + state: Arc, + listen_addr: SocketAddr, + notify_shutdown_rx: Receiver<()>, +) -> Result<(), CliError> { + // Configure router + let router = configure_router(state); + let listener = tokio::net::TcpListener::bind(listen_addr).await?; + + // Start server + info!("upload server started at: {listen_addr}"); + axum::serve(listener, router) + .with_graceful_shutdown(async move { + let _ = notify_shutdown_rx.await; + }) + .await?; + + Ok(()) +} + +fn configure_router(state: Arc) -> Router { + Router::new() + .route("/upload", post(upload)) + .route("/download/:cid", get(download)) + .with_state(state) + // Tracing layer + .layer( + TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { + // Log the matched route's path (with placeholders not filled in). + // Use request.uri() or OriginalUri if you want the real path. + let matched_path = request + .extensions() + .get::() + .map(MatchedPath::as_str); + + info_span!( + "request", + method = ?request.method(), + matched_path, + request_id = %Uuid::new_v4() + ) + }), + ) +} + +/// Handler for the upload endpoint. It receives a stream of bytes, coverts them +/// to a CAR file and returns the CID of the CAR file to the user. +async fn upload( + State(state): State>, + request: Request, +) -> Result { + // Body reader + let body_reader = StreamReader::new( + request + .into_body() + .into_data_stream() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)), + ); + + stream_contents_to_car(&state.storage_dir, body_reader) + .await + .map_err(|err| { + error!(?err, "failed to create a CAR file"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to create a CAR file".to_string(), + ) + }) + .map(|cid| cid.to_string()) +} + +/// Handler for the download endpoint. It receives a CID and streams the CAR +/// file back to the user. +async fn download( + State(state): State>, + Path(cid): Path, +) -> Result { + // Path to a CAR file + let cid = Cid::from_str(&cid).map_err(|e| { + error!(?e, cid, "cid incorrect format"); + (StatusCode::BAD_REQUEST, "cid incorrect format".to_string()) + })?; + + let (file_name, path) = content_path(&state.storage_dir, cid); + info!(?path, "file requested"); + + // Check if the file exists + if !path.exists() { + error!(?path, "file not found"); + return Err((StatusCode::NOT_FOUND, "file not found".to_string())); + } + + // Open car file + let file = File::open(&path).await.map_err(|e| { + error!(?e, ?path, "failed to open file"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to open file".to_string(), + ) + })?; + + // Convert the `AsyncRead` into a `Stream` + let stream = ReaderStream::new(file); + // Convert the `Stream` into the Body + let body = Body::from_stream(stream); + // Response headers + let headers = [ + (header::CONTENT_TYPE, "application/octet-stream"), + ( + header::CONTENT_DISPOSITION, + &format!("attachment; filename=\"{:?}\"", file_name), + ), + ]; + + Ok((headers, body).into_response()) +} + +/// Reads bytes from the source and writes them to a CAR file. +async fn stream_contents_to_car( + folder: &std::path::Path, + source: R, +) -> Result> +where + R: AsyncRead + Unpin, +{ + // Create a storage folder if it doesn't exist. + if !folder.exists() { + info!(?folder, "creating storage folder"); + fs::create_dir_all(folder).await?; + } + + // Temp file which will be used to store the CAR file content. The temp + // director has a randomized name and is created in the same folder as the + // finalized uploads are stored. + let temp_dir = tempdir_in(folder)?; + let temp_file_path = temp_dir.path().join("temp.car"); + + // Stream the body from source to the temp file. + let file = File::create(&temp_file_path).await?; + let writer = BufWriter::new(file); + let cid = create_filestore(source, writer, Config::default()).await?; + + // If the file is successfully written, we can now move it to the final + // location. + let (_, final_content_path) = content_path(folder, cid); + fs::rename(temp_file_path, &final_content_path).await?; + info!(?final_content_path, "CAR file created"); + + Ok(cid) +} + +/// Returns the tuple of file name and path for a specified Cid. +fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { + let name = format!("{cid}.car"); + let path = folder.join(&name); + (name, path) +} diff --git a/storage/mater/benches/benchmark.rs b/storage/mater/benches/benchmark.rs index 8ec3f4c97..6b55e4954 100644 --- a/storage/mater/benches/benchmark.rs +++ b/storage/mater/benches/benchmark.rs @@ -195,7 +195,10 @@ fn prepare_source_file(content: &[u8]) -> (TempDir, PathBuf) { /// Create a filestore. This function is benchmarked. async fn create_filestore_benched(source: &Path, target: &Path) { - create_filestore(source, target, Config::default()) + let source_file = File::open(source).await.unwrap(); + let output_file = File::create(target).await.unwrap(); + + create_filestore(source_file, output_file, Config::default()) .await .unwrap(); } diff --git a/storage/mater/src/async_varint.rs b/storage/mater/src/async_varint.rs new file mode 100644 index 000000000..3920cc174 --- /dev/null +++ b/storage/mater/src/async_varint.rs @@ -0,0 +1,123 @@ +/// Utility functions for the mater crate. The contents were mostly borrowed +/// from the . +/// +/// The original issue why we needed to borrow the implantation of the reader +/// and writer is +/// . +/// This specifies the Send bound as optional. The side effect of this choice is +/// that all futures using the writer or reader are non Send and there is no way +/// to make them Send. +/// +/// The second crate researched was +/// . Issue with that +/// crate is that it only implements AsyncRead and AsyncWrite from the futures +/// crate and not tokio. For the future reference we could probably used +/// `unsigned-varint` with the tokio and use +/// +/// as the compatibility layer. +use std::{io, mem::size_of}; + +use integer_encoding::VarInt; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +/// Write a VarInt integer to an asynchronous writer. +/// +/// Borrowed from: +/// +pub(crate) async fn write_varint(writer: &mut W, n: VI) -> Result +where + W: AsyncWrite + Unpin, + VI: VarInt, +{ + let mut buf = [0 as u8; 10]; + let b = n.encode_var(&mut buf); + writer.write_all(&buf[0..b]).await?; + Ok(b) +} + +/// Returns either the decoded integer, or an error. +/// +/// In general, this always reads a whole varint. If the encoded varint's value +/// is bigger than the valid value range of `VI`, then the value is truncated. +/// +/// On EOF, an io::Error with io::ErrorKind::UnexpectedEof is returned. +/// +/// Borrowed from: +/// +pub(crate) async fn read_varint(reader: &mut R) -> Result +where + R: AsyncRead + Unpin, + VI: VarInt, +{ + let mut buf = [0_u8; 1]; + let mut p = VarIntProcessor::new::(); + + while !p.finished() { + let read = reader.read(&mut buf).await?; + + // EOF + if read == 0 && p.i == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF")); + } + if read == 0 { + break; + } + + p.push(buf[0])?; + } + + p.decode() + .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF")) +} + +/// Most-significant byte, == 0x80 +const MSB: u8 = 0b1000_0000; + +/// VarIntProcessor encapsulates the logic for decoding a VarInt byte-by-byte. +/// +/// Borrowed from +/// +#[derive(Default)] +struct VarIntProcessor { + buf: [u8; 10], + maxsize: usize, + i: usize, +} + +impl VarIntProcessor { + fn new() -> VarIntProcessor { + VarIntProcessor { + maxsize: VI::varint_max_size(), + ..VarIntProcessor::default() + } + } + fn push(&mut self, b: u8) -> Result<(), io::Error> { + if self.i >= self.maxsize { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Unterminated varint", + )); + } + self.buf[self.i] = b; + self.i += 1; + Ok(()) + } + fn finished(&self) -> bool { + self.i > 0 && (self.buf[self.i - 1] & MSB == 0) + } + fn decode(&self) -> Option { + Some(VI::decode_var(&self.buf[0..self.i])?.0) + } +} + +/// Borrowed from +/// +pub(crate) trait VarIntMaxSize { + fn varint_max_size() -> usize; +} + +impl VarIntMaxSize for VI { + fn varint_max_size() -> usize { + (size_of::() * 8 + 7) / 7 + } +} diff --git a/storage/mater/src/lib.rs b/storage/mater/src/lib.rs index 175134cd5..b4a97292c 100644 --- a/storage/mater/src/lib.rs +++ b/storage/mater/src/lib.rs @@ -10,13 +10,14 @@ #![deny(rustdoc::private_intra_doc_links)] #![deny(unsafe_code)] +mod async_varint; mod multicodec; mod stores; mod unixfs; mod v1; mod v2; -// We need to expose this because `read_block` returns `(Cid, Vec)`. +// We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; pub use stores::{create_filestore, Blockstore, Config}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; diff --git a/storage/mater/src/stores/blockstore.rs b/storage/mater/src/stores/blockstore.rs index f8a5ce56a..69e6a29c1 100644 --- a/storage/mater/src/stores/blockstore.rs +++ b/storage/mater/src/stores/blockstore.rs @@ -85,7 +85,7 @@ impl Blockstore { /// converting the contents into a CARv2 file. pub async fn read(&mut self, reader: R) -> Result<(), Error> where - R: AsyncRead + Unpin + Send, + R: AsyncRead + Unpin, { let chunks = ReaderStream::with_capacity(reader, self.chunk_size); diff --git a/storage/mater/src/stores/filestore.rs b/storage/mater/src/stores/filestore.rs index 1a97640f1..aaf20b955 100644 --- a/storage/mater/src/stores/filestore.rs +++ b/storage/mater/src/stores/filestore.rs @@ -1,11 +1,7 @@ -use std::path::Path; - +use futures::stream::StreamExt; +use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; -use tokio::{ - fs::File, - io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}, -}; -use tokio_stream::StreamExt; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; use tokio_util::io::ReaderStream; use super::Config; @@ -15,19 +11,18 @@ use crate::{ }; async fn balanced_import( - mut source: Src, + source: Src, mut output: Out, chunk_size: usize, tree_width: usize, -) -> Result<(), Error> +) -> Result where - Src: AsyncRead + Unpin + Send, + Src: AsyncRead + Unpin, Out: AsyncWrite + AsyncSeek + Unpin, { - let chunker = ReaderStream::with_capacity(&mut source, chunk_size); - let nodes = stream_balanced_tree(chunker, tree_width); + let chunker = ReaderStream::with_capacity(source, chunk_size); + let nodes = stream_balanced_tree(chunker, tree_width).peekable(); tokio::pin!(nodes); - let mut nodes = nodes.peekable(); let mut writer = CarV2Writer::new(&mut output); let mut position = 0; @@ -48,7 +43,7 @@ where entries.push(entry); position += writer.write_block(&node_cid, &node_bytes).await?; - if nodes.peek().await.is_none() { + if nodes.as_mut().peek().await.is_none() { root = Some(node_cid); } } @@ -80,28 +75,24 @@ where let header_v1 = CarV1Header::new(vec![root]); writer.write_v1_header(&header_v1).await?; - Ok(()) + Ok(root) } -/// Convert a `source` file into a CARv2 file and write it to `output`. +/// Convert a `source` stream into a CARv2 file and write it to an `output` stream. pub async fn create_filestore( source: Src, output: Out, config: Config, -) -> Result<(), Error> +) -> Result where - Src: AsRef, - Out: AsRef, + Src: AsyncRead + Unpin, + Out: AsyncWrite + AsyncSeek + Unpin, { match config { Config::Balanced { chunk_size, tree_width, - } => { - let source_file = File::open(source).await?; - let output_file = File::create(output).await?; - balanced_import(source_file, output_file, chunk_size, tree_width).await - } + } => balanced_import(source, output, chunk_size, tree_width).await, } } @@ -110,6 +101,7 @@ mod test { use std::path::Path; use tempfile::tempdir; + use tokio::fs::File; use crate::{ stores::{filestore::create_filestore, Config}, @@ -124,7 +116,9 @@ mod test { let temp_dir = tempdir().unwrap(); let temp_path = temp_dir.path().join("lorem.car"); - create_filestore(original, &temp_path, Config::default()) + let source_file = File::open(original).await.unwrap(); + let output_file = File::create(&temp_path).await.unwrap(); + create_filestore(source_file, output_file, Config::default()) .await .unwrap(); diff --git a/storage/mater/src/unixfs/mod.rs b/storage/mater/src/unixfs/mod.rs index 963701c02..1a84cfa66 100644 --- a/storage/mater/src/unixfs/mod.rs +++ b/storage/mater/src/unixfs/mod.rs @@ -253,7 +253,7 @@ pub(crate) fn stream_balanced_tree( width: usize, ) -> impl Stream> where - I: Stream> + Send, + I: Stream>, { try_stream! { let mut tree: VecDeque> = VecDeque::new(); diff --git a/storage/mater/src/v1/reader.rs b/storage/mater/src/v1/reader.rs index cffa3ff96..0e201cd00 100644 --- a/storage/mater/src/v1/reader.rs +++ b/storage/mater/src/v1/reader.rs @@ -1,17 +1,16 @@ use std::io::Cursor; -use integer_encoding::VarIntAsyncReader; use ipld_core::{cid::Cid, codec::Codec}; use serde_ipld_dagcbor::codec::DagCborCodec; use tokio::io::{AsyncRead, AsyncReadExt}; -use crate::{v1::Header, v2::PRAGMA, Error}; +use crate::{async_varint::read_varint, v1::Header, v2::PRAGMA, Error}; pub(crate) async fn read_header(mut reader: R) -> Result where R: AsyncRead + Unpin, { - let header_length: usize = reader.read_varint_async().await?; + let header_length: usize = read_varint(&mut reader).await?; let mut header_buffer = vec![0; header_length]; reader.read_exact(&mut header_buffer).await?; @@ -46,7 +45,7 @@ pub(crate) async fn read_block(mut reader: R) -> Result<(Cid, Vec), Error where R: AsyncRead + Unpin, { - let full_block_length: usize = reader.read_varint_async().await?; + let full_block_length: usize = read_varint(&mut reader).await?; let mut full_block_buffer = vec![0; full_block_length]; reader.read_exact(&mut full_block_buffer).await?; diff --git a/storage/mater/src/v1/writer.rs b/storage/mater/src/v1/writer.rs index 0a06429f2..e78597f3a 100644 --- a/storage/mater/src/v1/writer.rs +++ b/storage/mater/src/v1/writer.rs @@ -1,10 +1,9 @@ -use integer_encoding::VarIntAsyncWriter; use ipld_core::{cid::Cid, codec::Codec}; use serde_ipld_dagcbor::codec::DagCborCodec; use tokio::io::{AsyncWrite, AsyncWriteExt}; pub use crate::v1::Header; -use crate::Error; +use crate::{async_varint::write_varint, Error}; /// Write [`crate::v1::Header`] to the provider writer. pub(crate) async fn write_header(writer: &mut W, header: &Header) -> Result @@ -12,7 +11,7 @@ where W: AsyncWrite + Unpin, { let encoded_header = DagCborCodec::encode_to_vec(header)?; - let varint_len = writer.write_varint_async(encoded_header.len()).await?; + let varint_len = write_varint(writer, encoded_header.len()).await?; writer.write_all(&encoded_header).await?; Ok(varint_len + encoded_header.len()) } @@ -32,7 +31,7 @@ where let data = block.as_ref(); let len = cid.encoded_len() + data.len(); - let varint_len = writer.write_varint_async(len).await?; + let varint_len = write_varint(writer, len).await?; // This allocation can probably be spared writer.write_all(&cid.to_bytes()).await?; writer.write_all(block.as_ref()).await?; diff --git a/storage/mater/src/v2/index.rs b/storage/mater/src/v2/index.rs index 000d82458..92822984b 100644 --- a/storage/mater/src/v2/index.rs +++ b/storage/mater/src/v2/index.rs @@ -1,9 +1,11 @@ use std::{collections::BTreeMap, mem::size_of}; -use integer_encoding::{VarIntAsyncReader, VarIntAsyncWriter}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use crate::Error; +use crate::{ + async_varint::{read_varint, write_varint}, + Error, +}; /// `IndexSorted` code format value, as defined in the /// [specification](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted). @@ -175,21 +177,19 @@ impl Index { } } -pub(crate) async fn write_index(mut writer: W, index: &Index) -> Result +pub async fn write_index(writer: &mut W, index: &Index) -> Result where W: AsyncWrite + Unpin, { let mut written_bytes = 0; match index { Index::IndexSorted(index) => { - written_bytes += writer.write_varint_async(INDEX_SORTED_CODE).await?; - written_bytes += write_index_sorted(&mut writer, index).await?; + written_bytes += write_varint(writer, INDEX_SORTED_CODE).await?; + written_bytes += write_index_sorted(writer, index).await?; } Index::MultihashIndexSorted(index) => { - written_bytes += writer - .write_varint_async(MULTIHASH_INDEX_SORTED_CODE) - .await?; - written_bytes += write_multihash_index_sorted(&mut writer, index).await?; + written_bytes += write_varint(writer, MULTIHASH_INDEX_SORTED_CODE).await?; + written_bytes += write_multihash_index_sorted(writer, index).await?; } } Ok(written_bytes) @@ -262,7 +262,7 @@ pub(crate) async fn read_index(mut reader: R) -> Result where R: AsyncRead + Unpin, { - let index_type: u64 = reader.read_varint_async().await?; + let index_type: u64 = read_varint(&mut reader).await?; return match index_type { INDEX_SORTED_CODE => Ok(Index::IndexSorted(read_index_sorted(&mut reader).await?)), MULTIHASH_INDEX_SORTED_CODE => Ok(Index::MultihashIndexSorted( diff --git a/storage/polka-index/src/local_index_directory/rdb.rs b/storage/polka-index/src/local_index_directory/rdb.rs index f1fa9a411..5d2046ee3 100644 --- a/storage/polka-index/src/local_index_directory/rdb.rs +++ b/storage/polka-index/src/local_index_directory/rdb.rs @@ -1361,7 +1361,7 @@ mod test { db.cf_handle(MULTIHASH_TO_PIECE_CID_CF), rocksdb::IteratorMode::Start, ) - .flat_map(std::convert::identity) + .flatten() .collect(); assert_eq!(indexes.len(), 2); @@ -1377,7 +1377,7 @@ mod test { db.cf_handle(MULTIHASH_TO_PIECE_CID_CF), rocksdb::IteratorMode::Start, ) - .flat_map(std::convert::identity) + .flatten() .collect(); assert!(indexes.is_empty());