diff --git a/.gitignore b/.gitignore index 00b3872a..03a153a5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.txt .DS_Store .idea +*.pki crates/tests/e2e-tests/files crates/tests/e2e-tests/prover crates/tests/e2e-tests/verifier diff --git a/Cargo.lock b/Cargo.lock index 02da8e80..7eb1eafb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,6 +1161,9 @@ dependencies = [ "futures-util", "hex", "home", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "jsonrpsee", "libsecp256k1", "num-bigint", diff --git a/INSTALL.md b/INSTALL.md index 80b09468..3af8a9a1 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -165,6 +165,7 @@ AutoUpdate=registry Environment=RUST_LOG=warn,gevulot=debug,sqlx=error Environment=GEVULOT_DB_URL=postgres://:@/gevulot Environment=GEVULOT_GPU_DEVICES=0000:01:00.0 +Environment=GEVULOT_P2P_DISCOVERY_ADDR=34.88.251.176:9999 Environment=GEVULOT_PSK_PASSPHRASE="" Network=host diff --git a/crates/cli/src/keyfile.rs b/crates/cli/src/keyfile.rs index 7d7fa494..23457f40 100644 --- a/crates/cli/src/keyfile.rs +++ b/crates/cli/src/keyfile.rs @@ -1,14 +1,16 @@ -use libsecp256k1::SecretKey; +use libsecp256k1::{PublicKey, SecretKey}; use rand::rngs::StdRng; use rand::SeedableRng; use std::fs; use std::path::PathBuf; -pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult<()> { +pub fn create_key_file(file_path: &PathBuf) -> crate::BoxResult { let key = SecretKey::random(&mut StdRng::from_entropy()); let key_array = key.serialize(); if !file_path.as_path().exists() { - Ok(fs::write(file_path, &key_array[..])?) + fs::write(file_path, &key_array[..])?; + let pubkey = PublicKey::from_secret_key(&key); + Ok(pubkey) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::Other, diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 1e4449cf..25d0e922 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -103,8 +103,9 @@ async fn main() { match args.command { ConfCommands::GenerateKey => match gevulot_cli::keyfile::create_key_file(&args.keyfile) { - Ok(()) => println!( - "Key generated and saved in file:{}", + Ok(pubkey) => println!( + "Key generated pubkey:{} and saved in file:{}", + hex::encode(pubkey.serialize()), args.keyfile.to_str().unwrap_or("") ), Err(err) => println!("Error during key file creation:{err}"), diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index f69a635e..c6565dcc 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -20,6 +20,9 @@ eyre = "0.6.8" futures-util = "0.3" hex = "0.4" home = "0.5" +http-body-util = "0.1" +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1", features = ["full"] } jsonrpsee = { version = "0.20", features = [ "client", "server" ] } libsecp256k1 = "0.7" num-bigint = { version = "0.4", features = [ "serde" ] } diff --git a/crates/node/src/asset_manager/mod.rs b/crates/node/src/asset_manager/mod.rs index 7c001dd4..63eacb3a 100644 --- a/crates/node/src/asset_manager/mod.rs +++ b/crates/node/src/asset_manager/mod.rs @@ -1,10 +1,11 @@ -use std::{path::PathBuf, sync::Arc, time::Duration}; - -use eyre::Result; +use eyre::{eyre, Result}; use gevulot_node::types::{ self, transaction::{Payload, ProgramData}, }; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::{path::PathBuf, sync::Arc, time::Duration}; use thiserror::Error; use tokio::{io::AsyncWriteExt, time::sleep}; @@ -36,14 +37,20 @@ pub struct AssetManager { config: Arc, database: Arc, http_client: reqwest::Client, + http_peer_list: Arc>>>, } impl AssetManager { - pub fn new(config: Arc, database: Arc) -> Self { + pub fn new( + config: Arc, + database: Arc, + http_peer_list: Arc>>>, + ) -> Self { AssetManager { config, database, http_client: reqwest::Client::new(), + http_peer_list, } } @@ -160,10 +167,73 @@ impl AssetManager { // TODO: Blocking operation. std::fs::create_dir_all(file_path.parent().unwrap())?; - let mut resp = self.http_client.get(url.clone()).send().await?; + let mut resp = match self.http_client.get(url.clone()).send().await { + Ok(resp) => resp, + Err(err) => { + let uri = file_path + .as_path() + .components() + .rev() + .take(2) + .map(|c| c.as_os_str().to_os_string()) + .reduce(|acc, mut el| { + el.push("/"); + el.push(&acc); + el + }) + .ok_or_else(|| eyre!("Download bad file path: {:?}", file_path)) + .and_then(|s| { + s.into_string() + .map_err(|err| eyre!("Download bad file path: {:?}", file_path)) + })?; + let peer_urls: Vec<_> = { + let list = self.http_peer_list.read().await; + list.iter() + .filter_map(|(peer, port)| { + port.map(|port| { + //use parse to create an URL, no new method. + let mut url = reqwest::Url::parse("http://localhost").unwrap(); //unwrap always succeed + url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed + url.set_port(Some(port)).unwrap(); //unwrap always succeed + url.set_path(&uri); //unwrap always succeed + url + }) + }) + .collect() + }; + tracing::debug!( + "asset manager download file from uri {uri} to {}, use peer list:{:?}", + file_path.as_path().to_str().unwrap().to_string(), + peer_urls + ); + + let mut resp = None; + for url in peer_urls { + if let Ok(val) = self.http_client.get(url).send().await { + resp = Some(val); + break; + } + } + match resp { + Some(resp) => resp, + _ => { + return Err(eyre!( + "Download no host found to download the file: {:?}", + file_path + )); + } + } + } + }; if resp.status() == reqwest::StatusCode::OK { - let fd = tokio::fs::File::create(&file_path).await?; + //create a tmp file during download. + //this way the file won't be available for download from the other nodes + //until it is completely written. + let mut tmp_file_path = file_path.clone(); + tmp_file_path.set_extension(".tmp"); + + let fd = tokio::fs::File::create(&tmp_file_path).await?; let mut fd = tokio::io::BufWriter::new(fd); while let Some(chunk) = resp.chunk().await? { @@ -171,10 +241,8 @@ impl AssetManager { } fd.flush().await?; - tracing::info!( - "downloaded file to {}", - file_path.as_path().to_str().unwrap().to_string() - ); + //rename to original name + std::fs::rename(tmp_file_path, file_path)?; } else { tracing::error!( "failed to download file from {}: response status: {}", diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index be038762..a474d9ae 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -44,12 +44,7 @@ pub struct Config { )] pub node_key_file: PathBuf, - #[arg( - long, - long_help = "", - env = "GEVULOT_P2P_DISCOVERY_ADDR", - default_value = "34.88.251.176:9999" - )] + #[arg(long, long_help = "", env = "GEVULOT_P2P_DISCOVERY_ADDR")] pub p2p_discovery_addrs: Vec, #[arg( @@ -60,6 +55,14 @@ pub struct Config { )] pub p2p_listen_addr: SocketAddr, + #[arg( + long, + long_help = "Port open to download file between nodes. Use P2P interface to bind.", + env = "GEVULOT_HTTP_PORT", + default_value = "9995" + )] + pub http_download_port: u16, + #[arg( long, long_help = "P2P PSK passphrase", diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index b01adfa2..1ac61270 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -173,11 +173,17 @@ impl networking::p2p::TxHandler for P2PTxHandler { async fn recv_tx(&self, tx: Transaction) -> Result<()> { // The transaction was received from P2P network so we can consider it // propagated at this point. + let tx_hash = tx.hash; let mut tx = tx; tx.propagated = true; // Submit the tx to mempool. - self.mempool.write().await.add(tx).await + self.mempool.write().await.add(tx).await?; + + //TODO copy paste of the asset manager handle_transaction method. + //added because when a tx arrive from the p2p asset are not added. + //should be done in a better way. + self.database.add_asset(&tx_hash).await } } @@ -198,6 +204,7 @@ async fn run(config: Arc) -> Result<()> { "mempool-pubsub", config.p2p_listen_addr, &config.p2p_psk_passphrase, + Some(config.http_download_port), ) .await, ); @@ -212,6 +219,9 @@ async fn run(config: Arc) -> Result<()> { ))) .await; + //start http download manager + let download_jh = networking::download_manager::serve_files(&config).await?; + // TODO(tuommaki): read total available resources from config / acquire system stats. let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 }; let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new( @@ -231,7 +241,11 @@ async fn run(config: Arc) -> Result<()> { resource_manager.clone(), ); - let asset_mgr = Arc::new(AssetManager::new(config.clone(), database.clone())); + let asset_mgr = Arc::new(AssetManager::new( + config.clone(), + database.clone(), + p2p.as_ref().peer_http_port_list.clone(), + )); let node_key = read_node_key(&config.node_key_file)?; @@ -295,10 +309,10 @@ async fn run(config: Arc) -> Result<()> { ) .await?; - tracing::info!("gevulot node started"); - loop { - sleep(Duration::from_secs(1)); + if let Err(err) = download_jh.await { + tracing::info!("download_manager error:{err}"); } + Ok(()) } /// p2p_beacon brings up P2P networking but nothing else. This function can be @@ -311,6 +325,7 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> { "gevulot-network", config.p2p_listen_addr, &config.p2p_psk_passphrase, + None, ) .await, ); diff --git a/crates/node/src/networking/download_manager.rs b/crates/node/src/networking/download_manager.rs index 581d7b00..1124245e 100644 --- a/crates/node/src/networking/download_manager.rs +++ b/crates/node/src/networking/download_manager.rs @@ -1,21 +1,93 @@ -use std::sync::Arc; +use crate::cli::Config; +use eyre::Result; +use futures_util::TryStreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full, StreamBody}; +use hyper::body::{self, Bytes, Frame}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use std::path::Path; +use tokio::fs::File; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; +use tokio_util::io::ReaderStream; -use crate::{cli::Config, storage}; +//start the local server and serve the specified file path. +//Return the server task join handle. +pub async fn serve_files(config: &Config) -> Result> { + let mut bind_addr = config.p2p_listen_addr; + bind_addr.set_port(config.http_download_port); + let listener = TcpListener::bind(bind_addr).await?; -pub struct DownloadManager { - database: Arc, - file_storage: Arc, + let jh = tokio::spawn({ + let data_directory = config.data_directory.clone(); + async move { + loop { + match listener.accept().await { + Ok((stream, _from)) => { + let io = TokioIo::new(stream); + tokio::task::spawn({ + let data_directory = data_directory.clone(); + async move { + if let Err(err) = http1::Builder::new() + .serve_connection( + io, + service_fn(|req| server_process_file(req, &data_directory)), + ) + .await + { + tracing::error!("Error serving node connection: {err}. Wait for a new node connection."); + } + } + }); + } + Err(err) => { + tracing::error!("Error during node connection to file http server:{err}"); + } + } + } + } + }); + + Ok(jh) } -impl DownloadManager { - pub fn new( - _config: Arc, - database: Arc, - file_storage: Arc, - ) -> Self { - DownloadManager { - database, - file_storage, +async fn server_process_file( + req: Request, + data_directory: &Path, +) -> std::result::Result>, hyper::Error> { + let file_digest = &req.uri().path()[1..]; + + let mut file_path = data_directory.join("images").join(file_digest); + + let file = match File::open(&file_path).await { + Ok(file) => file, + Err(_) => { + //try to see if the file is currently being updated. + file_path.set_extension(".tmp"); + let (status_code, message) = if file_path.as_path().exists() { + ( + StatusCode::PARTIAL_CONTENT, + "Update in progess, retry later", + ) + } else { + (StatusCode::NOT_FOUND, "File not found") + }; + return Ok(Response::builder() + .status(status_code) + .body(Full::new(message.into()).map_err(|e| match e {}).boxed()) + .unwrap()); } - } + }; + + let file_hash = file_digest.to_string(); + let reader = ReaderStream::new(file); + let stream_body = StreamBody::new(reader.map_ok(Frame::data)); + + Ok(Response::builder() + .status(StatusCode::OK) + .body(BodyExt::boxed(stream_body)) + .unwrap()) } diff --git a/crates/node/src/networking/mod.rs b/crates/node/src/networking/mod.rs index e34f90c2..325dbc5b 100644 --- a/crates/node/src/networking/mod.rs +++ b/crates/node/src/networking/mod.rs @@ -1,8 +1,4 @@ -mod download_manager; +pub mod download_manager; mod noise; pub mod p2p; - -#[allow(unused_imports)] -pub use download_manager::DownloadManager; - pub use p2p::P2P; diff --git a/crates/node/src/networking/p2p.rs b/crates/node/src/networking/p2p.rs index cc41c95f..9f4da702 100644 --- a/crates/node/src/networking/p2p.rs +++ b/crates/node/src/networking/p2p.rs @@ -45,9 +45,11 @@ pub struct P2P { noise_states: Arc>>, tx_handler: Arc>>, psk: Vec, - peer_list: Arc>>, + peer_list: Arc>>, //Map to connection local addr notified on_disconnect and the peer connection addr (peer_list). - peer_addr_mapping: Arc>>, + peer_addr_mapping: Arc>>, + pub peer_http_port_list: Arc>>>, + http_port: Option, } impl Pea2Pea for P2P { @@ -57,7 +59,12 @@ impl Pea2Pea for P2P { } impl P2P { - pub async fn new(name: &str, listen_addr: SocketAddr, psk_passphrase: &str) -> Self { + pub async fn new( + name: &str, + listen_addr: SocketAddr, + psk_passphrase: &str, + http_port: Option, + ) -> Self { let config = Config { name: Some(name.into()), listener_ip: Some(listen_addr.ip()), @@ -79,6 +86,8 @@ impl P2P { psk: psk.to_vec(), peer_list: Default::default(), peer_addr_mapping: Default::default(), + peer_http_port_list: Default::default(), + http_port, }; // Enable node functionalities. @@ -125,24 +134,24 @@ impl Handshake for P2P { self.noise_states.write().insert(conn.addr(), noise_state); //exchange peer list - let node_conn_side = !conn.side(); let stream = self.borrow_stream(&mut conn); let local_bind_addr = self.node.listening_addr().unwrap(); let peer_list_bytes: Vec = { - let peer_list: &mut BTreeSet = &mut self.peer_list.write(); - peer_list.insert(local_bind_addr); //add it if not present. - bincode::serialize(peer_list).map_err(|err| { + let mut peer_list = self.peer_list.write().await; + (*peer_list).insert(local_bind_addr); //add it if not present. + bincode::serialize(&*peer_list).map_err(|err| { std::io::Error::new(std::io::ErrorKind::Other, format!("serialize error:{err}")) })? }; - let (distant_peer_list, distant_listening_addr) = match node_conn_side { + let (distant_peer_list, distant_listening_addr, distant_http_port) = match node_conn_side { ConnectionSide::Initiator => { - //on_disconnect doesn't notify with the bind port but the connect port. - //can't be use to open a connection. - //So the connecting node notify it's bind address. + stream.write_u16(self.http_port.unwrap_or(0)).await?; //0 mean none. + //on_disconnect doesn't notify with the bind port but the connect port. + //can't be use to open a connection. + //So the connecting node notify it's bind address. let bind_addr_bytes = bincode::serialize(&self.node.listening_addr().unwrap()) .map_err(|err| { std::io::Error::new( @@ -157,6 +166,7 @@ impl Handshake for P2P { stream.write_u32(peer_list_bytes.len() as u32).await?; stream.write_all(&peer_list_bytes).await?; + let distant_http_port = stream.read_u16().await?; // receive the peer list let buffer_len = stream.read_u32().await? as usize; //TODO validate buffer lengh @@ -172,11 +182,17 @@ impl Handshake for P2P { self.peer_addr_mapping .write() + .await .insert(stream.peer_addr().unwrap(), stream.peer_addr().unwrap()); - (distant_peer_list, stream.peer_addr().unwrap()) + ( + distant_peer_list, + stream.peer_addr().unwrap(), + distant_http_port, + ) } ConnectionSide::Responder => { + let distant_http_port = stream.read_u16().await?; //receive the connecting node addr let buffer_len = stream.read_u32().await? as usize; let mut buffer = vec![0; buffer_len]; @@ -189,9 +205,10 @@ impl Handshake for P2P { ) })?; { - self.peer_list.write().insert(distant_listening_addr); + self.peer_list.write().await.insert(distant_listening_addr); self.peer_addr_mapping .write() + .await .insert(stream.peer_addr().unwrap(), distant_listening_addr); } @@ -207,23 +224,26 @@ impl Handshake for P2P { ) })?; + stream.write_u16(self.http_port.unwrap_or(0)).await?; //send peer list stream.write_u32(peer_list_bytes.len() as u32).await?; stream.write_all(&peer_list_bytes).await?; - (distant_peer_list, distant_listening_addr) + (distant_peer_list, distant_listening_addr, distant_http_port) } }; + tracing::trace!("New connection: local:{local_bind_addr} distant:{distant_listening_addr}"); + //do peer comparition let mut local_diff = { - let local_peer_list: &mut BTreeSet = &mut self.peer_list.write(); + let mut local_peer_list = self.peer_list.write().await; let local_diff: BTreeSet = distant_peer_list - .difference(local_peer_list) + .difference(&*local_peer_list) .cloned() .collect(); - local_peer_list.append(&mut local_diff.iter().cloned().collect()); + (*local_peer_list).append(&mut local_diff.iter().cloned().collect()); local_diff }; local_diff.remove(&local_bind_addr); @@ -236,6 +256,11 @@ impl Handshake for P2P { let _ = node.connect(addr).await; } + self.peer_http_port_list.write().await.insert( + distant_listening_addr, + (distant_http_port != 0).then_some(distant_http_port), + ); + Ok(conn) } } @@ -275,8 +300,12 @@ impl Writing for P2P { #[async_trait::async_trait] impl OnDisconnect for P2P { async fn on_disconnect(&self, addr: SocketAddr) { - if let Some(peer_conn_addr) = self.peer_addr_mapping.write().remove(&addr) { - self.peer_list.write().remove(&peer_conn_addr); + if let Some(peer_conn_addr) = self.peer_addr_mapping.write().await.remove(&addr) { + let distant_listening_addr = self.peer_list.write().await.remove(&peer_conn_addr); + self.peer_http_port_list + .write() + .await + .remove(&peer_conn_addr); } } } @@ -285,9 +314,9 @@ impl OnDisconnect for P2P { mod tests { use super::*; use eyre::Result; - use gevulot_node::types::{transaction::Payload, Transaction}; + use gevulot_node::types::{transaction::Payload, Hash, Transaction}; use libsecp256k1::SecretKey; - use rand::{rngs::StdRng, SeedableRng}; + use rand::{rngs::StdRng, RngCore, SeedableRng}; use tokio::sync::mpsc::{self, Sender}; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -322,9 +351,27 @@ mod tests { Arc::new(Sink::new(Arc::new(tx3))), ); let (peer1, peer2, peer3) = ( - P2P::new("peer1", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await, - P2P::new("peer2", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await, - P2P::new("peer3", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await, + P2P::new( + "peer1", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + None, + ) + .await, + P2P::new( + "peer2", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + Some(9995), + ) + .await, + P2P::new( + "peer3", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + Some(9995), + ) + .await, ); tracing::debug!("start listening"); @@ -344,6 +391,9 @@ mod tests { .await .unwrap(); + assert_eq!(peer1.peer_http_port_list.read().await.len(), 1); + assert_eq!(peer2.peer_http_port_list.read().await.len(), 1); + tracing::debug!("connect peer3 to peer1"); peer3 .node() @@ -353,6 +403,10 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + assert_eq!(peer1.peer_http_port_list.read().await.len(), 2); + assert_eq!(peer2.peer_http_port_list.read().await.len(), 2); + assert_eq!(peer3.peer_http_port_list.read().await.len(), 2); + tracing::debug!("send tx from peer2 to peer1 and peer3"); let tx = new_tx(); peer2.send_tx(&tx).await.unwrap(); @@ -385,13 +439,24 @@ mod tests { Arc::new(Sink::new(Arc::new(tx2))), ); - let peer1 = P2P::new("peer1", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await; + let peer1 = P2P::new( + "peer1", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + None, + ) + .await; peer1.node().start_listening().await.expect("peer1 listen"); peer1.register_tx_handler(sink1.clone()).await; { - let peer2 = - P2P::new("peer2", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await; + let peer2 = P2P::new( + "peer2", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + Some(8776), + ) + .await; peer2.node().start_listening().await.expect("peer2 listen"); peer2.register_tx_handler(sink2.clone()).await; @@ -402,6 +467,8 @@ mod tests { .connect(peer2.node().listening_addr().unwrap()) .await .unwrap(); + assert_eq!(peer1.peer_http_port_list.read().await.len(), 1); + assert_eq!(peer2.peer_http_port_list.read().await.len(), 1); tracing::debug!("Nodes Connected"); tracing::debug!("send tx from peer1 to peer2"); @@ -433,8 +500,10 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - assert_eq!(peer1.peer_list.read().len(), 1); - assert!(peer1.peer_addr_mapping.read().is_empty()); + assert_eq!(peer1.peer_list.read().await.len(), 1); + assert!(peer1.peer_addr_mapping.read().await.is_empty()); + assert_eq!(peer1.peer_http_port_list.read().await.len(), 0); + assert_eq!(peer1.peer_http_port_list.read().await.len(), 0); } #[tokio::test] @@ -448,8 +517,20 @@ mod tests { Arc::new(Sink::new(Arc::new(tx2))), ); let (peer1, peer2) = ( - P2P::new("peer1", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await, - P2P::new("peer2", "127.0.0.1:0".parse().unwrap(), "secret passphrase").await, + P2P::new( + "peer1", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + None, + ) + .await, + P2P::new( + "peer2", + "127.0.0.1:0".parse().unwrap(), + "secret passphrase", + None, + ) + .await, ); tracing::debug!("start listening"); @@ -484,8 +565,16 @@ mod tests { fn new_tx() -> Transaction { let rng = &mut StdRng::from_entropy(); + let mut tx = Transaction { + hash: Hash::random(rng), + payload: Payload::Empty, + nonce: rng.next_u64(), + ..Default::default() + }; + let key = SecretKey::random(rng); - Transaction::new(Payload::Empty, &key) + tx.sign(&key); + tx } fn start_logger(default_level: LevelFilter) { diff --git a/crates/node/src/rpc_server/mod.rs b/crates/node/src/rpc_server/mod.rs index 6cbc8c53..f4e1315b 100644 --- a/crates/node/src/rpc_server/mod.rs +++ b/crates/node/src/rpc_server/mod.rs @@ -205,6 +205,7 @@ mod tests { num_cpus: 8, mem_gb: 8, gpu_devices: None, + http_download_port: 0, }); let db = Arc::new(Database::new(&cfg.db_url).await.unwrap()); @@ -213,7 +214,11 @@ mod tests { .await .unwrap(), )); - let asset_manager = Arc::new(AssetManager::new(cfg.clone(), db.clone())); + let asset_manager = Arc::new(AssetManager::new( + cfg.clone(), + db.clone(), + Arc::new(RwLock::new(std::collections::HashMap::new())), + )); RpcServer::run(cfg.clone(), db.clone(), mempool, asset_manager) .await