Skip to content

Commit

Permalink
integrate retrieval server
Browse files Browse the repository at this point in the history
  • Loading branch information
cernicc committed Jan 23, 2025
1 parent b098815 commit 800b896
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 10 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions storage-provider/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ delia = []
[dependencies]
# "Homegrown" crates
mater = { workspace = true }
polka-index = { workspace = true }
polka-storage-proofs = { workspace = true, features = ["std", "substrate"] }
polka-storage-provider-common = { workspace = true }
polka-storage-retrieval = { workspace = true }
primitives = { workspace = true, features = ["clap", "serde", "std"] }
storagext = { workspace = true, features = ["clap"] }

async-trait = { workspace = true }
axum = { workspace = true, features = ["macros", "multipart"] }
blockstore = { workspace = true }
cid = { workspace = true, features = ["serde", "std"] }
clap = { workspace = true, features = ["derive"] }
codec = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions storage-provider/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ fn default_node_address() -> Url {
Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url")
}

fn default_retrieval_address() -> Multiaddr {
format!("/ip4/127.0.0.1/tcp/8002")
.parse()
.expect("multiaddres is correct")
}
#[derive(Debug, Clone, Deserialize, Args)]
#[group(multiple = true, conflicts_with = "config")]
#[serde(deny_unknown_fields)]
Expand All @@ -54,6 +59,11 @@ pub struct ConfigurationArgs {
#[arg(long, default_value_t = default_node_address())]
pub(crate) node_url: Url,

/// Storage provider listen address.
#[serde(default = "default_retrieval_address")]
#[arg(long, default_value_t = default_retrieval_address())]
pub(crate) retrieval_listen_address: Multiaddr,

/// RocksDB storage directory.
/// Defaults to a temporary random directory, like `/tmp/<random>/deals_database`.
#[arg(long)]
Expand Down
52 changes: 48 additions & 4 deletions storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ mod config;
mod db;
mod p2p;
mod pipeline;
mod retrieval;
mod rpc;
mod storage;

use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{env::temp_dir, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc, time::Duration};

use clap::Parser;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
Expand All @@ -18,13 +19,15 @@ use p2p::{
RegisterConfig,
};
use pipeline::types::PipelineMessage;
use polka_index::local_index_directory::rdb::{RocksDBLid, RocksDBStateStoreConfig};
use polka_storage_proofs::{
porep::{self, PoRepParameters},
post::{self, PoStParameters},
};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives::proofs::{RegisteredPoStProof, RegisteredSealProof};
use rand::Rng;
use retrieval::{start_retrieval, RetrievalServerConfig};
use storagext::{
multipair::{MultiPairArgs, MultiPairSigner},
runtime::runtime_types::{
Expand Down Expand Up @@ -72,6 +75,9 @@ pub(crate) const SEALED_SECTOR_DIRECTORY_NAME: &str = "sealed";
/// Name for the directory where the sealing cache is kept.
pub(crate) const SEALING_CACHE_DIRECTORY_NANE: &str = "cache";

/// Name of the directory where the index is kept.
pub(crate) const INDEXER_DIRECTORY_NAME: &str = "index";

fn get_random_temporary_folder() -> PathBuf {
temp_dir().join(
rand::thread_rng()
Expand All @@ -88,6 +94,7 @@ struct SetupOutput {
pipeline_state: PipelineState,
pipeline_rx: UnboundedReceiver<PipelineMessage>,
p2p_state: P2PState,
retrieval_config: RetrievalServerConfig<RocksDBLid>,
}

fn main() -> Result<(), ServerError> {
Expand Down Expand Up @@ -176,6 +183,12 @@ pub enum ServerError {

#[error(transparent)]
P2P(#[from] P2PError),

#[error(transparent)]
RetrievalServer(#[from] polka_storage_retrieval::server::ServerError),

#[error(transparent)]
Lid(#[from] polka_index::local_index_directory::LidError),
}

/// Takes an expression that returns a `Result<Result<T, E2>, E1>`.
Expand Down Expand Up @@ -232,6 +245,9 @@ pub struct Server {
/// Parachain node RPC url.
node_url: Url,

/// Storage provider listen address.
retrieval_listen_address: Multiaddr,

/// Storage provider key pair.
multi_pair_signer: MultiPairSigner,

Expand Down Expand Up @@ -353,6 +369,7 @@ impl TryFrom<ServerCli> for Server {
p2p_key: args.p2p_key,
rendezvous_point_address: args.rendezvous_point_address,
rendezvous_point: args.rendezvous_point,
retrieval_listen_address: args.retrieval_listen_address,
})
}
}
Expand All @@ -365,6 +382,7 @@ impl Server {
pipeline_state,
pipeline_rx,
p2p_state,
retrieval_config,
} = self.setup().await?;

let cancellation_token = CancellationToken::new();
Expand All @@ -384,6 +402,11 @@ impl Server {
cancellation_token.child_token(),
));

let retrieval_task = tokio::spawn(start_retrieval(
retrieval_config,
cancellation_token.child_token(),
));

// Wait for SIGTERM on the main thread and once received "unblock"
tokio::signal::ctrl_c()
.await
Expand All @@ -394,8 +417,13 @@ impl Server {
tracing::info!("sent shutdown signal");

// Wait for the tasks to finish
let (upload_result, rpc_task, pipeline_task, p2p_task) =
tokio::join!(storage_task, rpc_task, pipeline_task, p2p_task);
let (upload_result, rpc_task, pipeline_task, p2p_task, retrieval_task) = tokio::join!(
storage_task,
rpc_task,
pipeline_task,
p2p_task,
retrieval_task
);

// Inspect and log errors
let (upload_result, rpc_task, pipeline_task, p2p_task) =
Expand All @@ -406,6 +434,7 @@ impl Server {
rpc_task??;
pipeline_task??;
p2p_task??;
retrieval_task??;

Ok(())
}
Expand All @@ -427,12 +456,19 @@ impl Server {
let sealed_sector_storage_dir =
Arc::new(self.storage_directory.join(SEALED_SECTOR_DIRECTORY_NAME));
let sealing_cache_dir = Arc::new(self.storage_directory.join(SEALING_CACHE_DIRECTORY_NANE));
let index_dir = Arc::new(self.storage_directory.join(INDEXER_DIRECTORY_NAME));

// Create the storage directories
tokio::fs::create_dir_all(car_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealing_cache_dir.as_ref()).await?;
tokio::fs::create_dir_all(index_dir.as_ref()).await?;

// Indexer used by the system
let indexer = Arc::new(RocksDBLid::new(RocksDBStateStoreConfig {
path: index_dir.deref().clone(),
})?);

let (pipeline_tx, pipeline_rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();

Expand Down Expand Up @@ -465,7 +501,7 @@ impl Server {
let pipeline_state = PipelineState {
db: deal_database.clone(),
server_info: rpc_state.server_info.clone(),
unsealed_sectors_dir: unsealed_sector_storage_dir,
unsealed_sectors_dir: unsealed_sector_storage_dir.clone(),
sealed_sectors_dir: sealed_sector_storage_dir,
sealing_cache_dir,
porep_parameters: Arc::new(self.porep_parameters),
Expand All @@ -483,12 +519,20 @@ impl Server {
rendezvous_point: self.rendezvous_point,
};

let unsealed_sectors_dir = unsealed_sector_storage_dir.deref().clone();
let retrieval_config = RetrievalServerConfig {
listen_address: self.retrieval_listen_address,
unsealed_sectors_dir,
indexer,
};

Ok(SetupOutput {
storage_state,
rpc_state,
pipeline_state,
pipeline_rx,
p2p_state,
retrieval_config,
})
}

Expand Down
54 changes: 54 additions & 0 deletions storage-provider/server/src/retrieval/blockstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{path::Path, sync::Arc};

use blockstore::Error;
use polka_index::local_index_directory::Service;

/// The blockstore that reads blocks directly from unsealed sectors
pub struct ProviderBlockstore<I> {
indexer: Arc<I>,
}

impl<I> ProviderBlockstore<I> {
pub fn new<P>(unsealed_sectors_path: P, indexer: Arc<I>) -> Self
where
I: Service,
P: AsRef<Path>,
{
Self { indexer }
}
}

impl<I> blockstore::Blockstore for ProviderBlockstore<I>
where
I: Send + Sync + 'static,
{
async fn get<const S: usize>(
&self,
_cid: &cid::CidGeneric<S>,
) -> blockstore::Result<Option<Vec<u8>>> {
// TODO: Find the sector that holds the requested cid
// TODO: open a file handler to the sector file.
// TODO: Read the blocks from the sector
todo!()
}

async fn put_keyed<const S: usize>(
&self,
_cid: &cid::CidGeneric<S>,
_data: &[u8],
) -> blockstore::Result<()> {
Err(Error::FatalDatabaseError(
"put operation not supported".to_string(),
))
}

async fn remove<const S: usize>(&self, _cid: &cid::CidGeneric<S>) -> blockstore::Result<()> {
Err(Error::FatalDatabaseError(
"remove operation not supported".to_string(),
))
}

async fn close(self) -> blockstore::Result<()> {
Ok(())
}
}
43 changes: 43 additions & 0 deletions storage-provider/server/src/retrieval/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::{path::PathBuf, sync::Arc};

use blockstore::ProviderBlockstore;
use libp2p::Multiaddr;
use polka_index::local_index_directory::Service;
use polka_storage_retrieval::Server;
use tokio_util::sync::CancellationToken;

use crate::ServerError;

mod blockstore;

pub struct RetrievalServerConfig<I> {
pub listen_address: Multiaddr,
pub unsealed_sectors_dir: PathBuf,
pub indexer: Arc<I>,
}

#[tracing::instrument(skip_all)]
pub async fn start_retrieval<I>(
config: RetrievalServerConfig<I>,
token: CancellationToken,
) -> Result<(), ServerError>
where
I: Service + Send + Sync + 'static,
{
// Blockstore used by the retrieval server provider
let blockstore = Arc::new(ProviderBlockstore::new(
config.unsealed_sectors_dir,
config.indexer,
));

// Setup & run the retrieval server
let server = Server::new(blockstore)?;
server
.run(vec![config.listen_address], async move {
token.cancelled_owned().await;
tracing::trace!("shutting down the retrieval server");
})
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion storage-retrieval/lib/examples/simple_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<()> {
// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
server.run(vec![listener]).await?;
server.run(vec![listener], std::future::pending()).await?;

Ok(())
}
Expand Down
28 changes: 23 additions & 5 deletions storage-retrieval/lib/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{io, sync::Arc};

use blockstore::Blockstore;
use futures::StreamExt;
use futures::{pin_mut, Future, StreamExt};
use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::{ConnectionId, SwarmEvent};
Expand Down Expand Up @@ -43,17 +43,35 @@ where

// Start the server. The server will stop if it received a cancellation
// event or some error occurred.
pub async fn run(mut self, listeners: Vec<Multiaddr>) -> Result<(), ServerError> {
pub async fn run<F>(
mut self,
listeners: Vec<Multiaddr>,
shutdown_signal: F,
) -> Result<(), ServerError>
where
F: Future<Output = ()> + Send + 'static,
{
// Listen on
for listener in listeners {
self.swarm.listen_on(listener)?;
}

// Keep server running
pin_mut!(shutdown_signal);

loop {
let event = self.swarm.select_next_some().await;
self.on_swarm_event(event)?;
tokio::select! {
event = self.swarm.select_next_some() => {
self.on_swarm_event(event)?;

}
_ = &mut shutdown_signal => {
trace!("received shutdown signal");
break;
}
}
}

Ok(())
}

fn on_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent<B>>) -> Result<(), ServerError> {
Expand Down

0 comments on commit 800b896

Please sign in to comment.