Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Tx file propagation #58

Merged
merged 16 commits into from
Jan 25, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.txt
.DS_Store
.idea
*.pki
crates/tests/e2e-tests/files
crates/tests/e2e-tests/prover
crates/tests/e2e-tests/verifier
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions crates/cli/src/keyfile.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use libsecp256k1::SecretKey;
use libsecp256k1::{PublicKey, SecretKey};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::fs;
use std::path::PathBuf;

pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult<()> {
pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult<PublicKey> {
let key = SecretKey::random(&mut StdRng::from_entropy());
let key_array = key.serialize();
if !file_path.as_path().exists() {
Ok(fs::write(file_path, &key_array[..])?)
fs::write(file_path, &key_array[..])?;
let pubkey = PublicKey::from_secret_key(&key);
Ok(pubkey)
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
5 changes: 3 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ async fn main() {

match args.command {
ConfCommands::GenerateKey => match gevulot_cli::keyfile::create_key_file(&args.keyfile) {
Ok(()) => println!(
"Key generated and saved in file:{}",
Ok(pubkey) => println!(
"Key generated pubkey:{} and saved in file:{}",
hex::encode(pubkey.serialize()),
args.keyfile.to_str().unwrap_or("")
),
Err(err) => println!("Error during key file creation:{err}"),
Expand Down
3 changes: 3 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ eyre = "0.6.8"
futures-util = "0.3"
hex = "0.4"
home = "0.5"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
jsonrpsee = { version = "0.20", features = [ "client", "server" ] }
libsecp256k1 = "0.7"
num-bigint = { version = "0.4", features = [ "serde" ] }
Expand Down
88 changes: 78 additions & 10 deletions crates/node/src/asset_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{path::PathBuf, sync::Arc, time::Duration};

use eyre::Result;
use eyre::{eyre, Result};
use gevulot_node::types::{
self,
transaction::{Payload, ProgramData},
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::{path::PathBuf, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::{io::AsyncWriteExt, time::sleep};

Expand Down Expand Up @@ -36,14 +37,20 @@ pub struct AssetManager {
config: Arc<Config>,
database: Arc<Database>,
http_client: reqwest::Client,
http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
}

impl AssetManager {
pub fn new(config: Arc<Config>, database: Arc<Database>) -> Self {
pub fn new(
config: Arc<Config>,
database: Arc<Database>,
http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
) -> Self {
AssetManager {
config,
database,
http_client: reqwest::Client::new(),
http_peer_list,
}
}

Expand Down Expand Up @@ -160,21 +167,82 @@ impl AssetManager {
// TODO: Blocking operation.
std::fs::create_dir_all(file_path.parent().unwrap())?;

let mut resp = self.http_client.get(url.clone()).send().await?;
let mut resp = match self.http_client.get(url.clone()).send().await {
Ok(resp) => resp,
Err(err) => {
let uri = file_path
.as_path()
.components()
.rev()
.take(2)
.map(|c| c.as_os_str().to_os_string())
.reduce(|acc, mut el| {
el.push("/");
el.push(&acc);
el
})
.ok_or_else(|| eyre!("Download bad file path: {:?}", file_path))
.and_then(|s| {
s.into_string()
.map_err(|err| eyre!("Download bad file path: {:?}", file_path))
})?;
let peer_urls: Vec<_> = {
let list = self.http_peer_list.read().await;
list.iter()
.filter_map(|(peer, port)| {
port.map(|port| {
//use parse to create an URL, no new method.
let mut url = reqwest::Url::parse("http://localhost").unwrap(); //unwrap always succeed
url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed
url.set_port(Some(port)).unwrap(); //unwrap always succeed
url.set_path(&uri); //unwrap always succeed
url
})
})
.collect()
};
tracing::debug!(
"asset manager download file from uri {uri} to {}, use peer list:{:?}",
file_path.as_path().to_str().unwrap().to_string(),
peer_urls
);

let mut resp = None;
for url in peer_urls {
if let Ok(val) = self.http_client.get(url).send().await {
resp = Some(val);
break;
}
}
match resp {
Some(resp) => resp,
_ => {
return Err(eyre!(
"Download no host found to download the file: {:?}",
file_path
));
}
}
}
};

if resp.status() == reqwest::StatusCode::OK {
let fd = tokio::fs::File::create(&file_path).await?;
//create a tmp file during download.
//this way the file won't be available for download from the other nodes
//until it is completely written.
let mut tmp_file_path = file_path.clone();
tmp_file_path.set_extension(".tmp");

let fd = tokio::fs::File::create(&tmp_file_path).await?;
let mut fd = tokio::io::BufWriter::new(fd);

while let Some(chunk) = resp.chunk().await? {
fd.write_all(&chunk).await?;
}

fd.flush().await?;
tracing::info!(
"downloaded file to {}",
file_path.as_path().to_str().unwrap().to_string()
);
//rename to original name
std::fs::rename(tmp_file_path, file_path)?;
} else {
tracing::error!(
"failed to download file from {}: response status: {}",
Expand Down
15 changes: 9 additions & 6 deletions crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ pub struct Config {
)]
pub node_key_file: PathBuf,

#[arg(
long,
long_help = "",
env = "GEVULOT_P2P_DISCOVERY_ADDR",
default_value = "34.88.251.176:9999"
)]
#[arg(long, long_help = "", env = "GEVULOT_P2P_DISCOVERY_ADDR")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To manage the case the node start without connecting, sort of primary bootstraper. In this case, you specify nothing and the node didn't try to connect.

pub p2p_discovery_addrs: Vec<String>,

#[arg(
Expand All @@ -60,6 +55,14 @@ pub struct Config {
)]
pub p2p_listen_addr: SocketAddr,

#[arg(
long,
long_help = "Port open to download file between nodes. Use P2P interface to bind.",
env = "GEVULOT_HTTP_PORT",
default_value = "9995"
)]
pub http_download_port: u16,

#[arg(
long,
long_help = "P2P PSK passphrase",
Expand Down
25 changes: 20 additions & 5 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,17 @@ impl networking::p2p::TxHandler for P2PTxHandler {
async fn recv_tx(&self, tx: Transaction) -> Result<()> {
// The transaction was received from P2P network so we can consider it
// propagated at this point.
let tx_hash = tx.hash;
let mut tx = tx;
tx.propagated = true;

// Submit the tx to mempool.
self.mempool.write().await.add(tx).await
self.mempool.write().await.add(tx).await?;

//TODO copy paste of the asset manager handle_transaction method.
//added because when a tx arrive from the p2p asset are not added.
//should be done in a better way.
self.database.add_asset(&tx_hash).await
}
}

Expand All @@ -198,6 +204,7 @@ async fn run(config: Arc<Config>) -> Result<()> {
"mempool-pubsub",
config.p2p_listen_addr,
&config.p2p_psk_passphrase,
Some(config.http_download_port),
)
.await,
);
Expand All @@ -212,6 +219,9 @@ async fn run(config: Arc<Config>) -> Result<()> {
)))
.await;

//start http download manager
let download_jh = networking::download_manager::serve_files(&config).await?;

// TODO(tuommaki): read total available resources from config / acquire system stats.
let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 };
let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new(
Expand All @@ -231,7 +241,11 @@ async fn run(config: Arc<Config>) -> Result<()> {
resource_manager.clone(),
);

let asset_mgr = Arc::new(AssetManager::new(config.clone(), database.clone()));
let asset_mgr = Arc::new(AssetManager::new(
config.clone(),
database.clone(),
p2p.as_ref().peer_http_port_list.clone(),
));

let node_key = read_node_key(&config.node_key_file)?;

Expand Down Expand Up @@ -295,10 +309,10 @@ async fn run(config: Arc<Config>) -> Result<()> {
)
.await?;

tracing::info!("gevulot node started");
loop {
sleep(Duration::from_secs(1));
if let Err(err) = download_jh.await {
tracing::info!("download_manager error:{err}");
}
Ok(())
}

/// p2p_beacon brings up P2P networking but nothing else. This function can be
Expand All @@ -311,6 +325,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> {
"gevulot-network",
config.p2p_listen_addr,
&config.p2p_psk_passphrase,
None,
)
.await,
);
Expand Down
102 changes: 87 additions & 15 deletions crates/node/src/networking/download_manager.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,93 @@
use std::sync::Arc;
use crate::cli::Config;
use eyre::Result;
use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{self, Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use std::path::Path;
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio_util::io::ReaderStream;

use crate::{cli::Config, storage};
//start the local server and serve the specified file path.
//Return the server task join handle.
pub async fn serve_files(config: &Config) -> Result<JoinHandle<()>> {
let mut bind_addr = config.p2p_listen_addr;
bind_addr.set_port(config.http_download_port);
let listener = TcpListener::bind(bind_addr).await?;

pub struct DownloadManager {
database: Arc<storage::Database>,
file_storage: Arc<storage::File>,
let jh = tokio::spawn({
let data_directory = config.data_directory.clone();
async move {
loop {
match listener.accept().await {
Ok((stream, _from)) => {
let io = TokioIo::new(stream);
tokio::task::spawn({
let data_directory = data_directory.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(|req| server_process_file(req, &data_directory)),
)
.await
{
tracing::error!("Error serving node connection: {err}. Wait for a new node connection.");
}
}
});
}
Err(err) => {
tracing::error!("Error during node connection to file http server:{err}");
}
}
}
}
});

Ok(jh)
}

impl DownloadManager {
pub fn new(
_config: Arc<Config>,
database: Arc<storage::Database>,
file_storage: Arc<storage::File>,
) -> Self {
DownloadManager {
database,
file_storage,
async fn server_process_file(
req: Request<body::Incoming>,
data_directory: &Path,
) -> std::result::Result<Response<BoxBody<Bytes, std::io::Error>>, hyper::Error> {
let file_digest = &req.uri().path()[1..];

let mut file_path = data_directory.join("images").join(file_digest);

let file = match File::open(&file_path).await {
Ok(file) => file,
Err(_) => {
//try to see if the file is currently being updated.
file_path.set_extension(".tmp");
let (status_code, message) = if file_path.as_path().exists() {
(
StatusCode::PARTIAL_CONTENT,
"Update in progess, retry later",
)
} else {
(StatusCode::NOT_FOUND, "File not found")
};
return Ok(Response::builder()
.status(status_code)
.body(Full::new(message.into()).map_err(|e| match e {}).boxed())
.unwrap());
}
}
};

let file_hash = file_digest.to_string();
let reader = ReaderStream::new(file);
let stream_body = StreamBody::new(reader.map_ok(Frame::data));

Ok(Response::builder()
.status(StatusCode::OK)
.body(BodyExt::boxed(stream_body))
.unwrap())
}
Loading
Loading