From d895a5b5b4bae91921c4b11e1731ade2fd5551fa Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 12 Dec 2022 16:03:41 +0100 Subject: [PATCH 1/2] chore: Introduce dedicate runtime Annotating the function with `#[tokio::main]` killed the runtime after the function completes. Thus we had to block the run function indefinitely for the long living threads to keep going. This change introduces a runtime managed outside of the function scope and thus allows the `run` function to return bringing the following advantages. - We don't block a whole frb worker thread just to run the lightning node, sync tasks, background processor, etc. - We are using a multi threaded runtime instead of the current thread - allowing to actually join the background processor without blocking all other tasks. - making better use of multiple cpu cores. - We are not creating a new runtime on every async bridge call. --- rust/src/api.rs | 244 ++++++++++++++++++++++------------------- rust/src/connection.rs | 56 ++++------ rust/src/lightning.rs | 6 +- rust/src/offer.rs | 13 --- rust/src/wallet.rs | 12 +- 5 files changed, 162 insertions(+), 169 deletions(-) diff --git a/rust/src/api.rs b/rust/src/api.rs index 4e656de5..2463ecad 100644 --- a/rust/src/api.rs +++ b/rust/src/api.rs @@ -13,7 +13,6 @@ use crate::offer::Offer; use crate::wallet; use crate::wallet::Balance; use crate::wallet::LightningTransaction; -use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use flutter_rust_bridge::StreamSink; @@ -22,13 +21,14 @@ use lightning_invoice::Invoice; use lightning_invoice::InvoiceDescription; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; +use state::Storage; use std::ops::Add; use std::path::Path; use std::str::FromStr; use std::time::SystemTime; use time::Duration; pub use time::OffsetDateTime; -use tokio::try_join; +use tokio::runtime::Runtime; pub struct Address { pub address: String, @@ -168,91 +168,114 @@ impl WalletInfo { Ok(tx_history) } } +/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to +/// the number of available cores. +fn runtime() -> Result<&'static Runtime> { + static RUNTIME: Storage = Storage::new(); -#[tokio::main(flavor = "current_thread")] -pub async fn refresh_wallet_info() -> Result { - wallet::sync()?; - WalletInfo::build_wallet_info().await + if RUNTIME.try_get().is_none() { + let runtime = Runtime::new()?; + RUNTIME.set(runtime); + } + + Ok(RUNTIME.get()) +} + +pub fn refresh_wallet_info() -> Result { + runtime()?.block_on(async { + wallet::sync()?; + WalletInfo::build_wallet_info().await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn run(stream: StreamSink, app_dir: String) -> Result<()> { +pub fn run(stream: StreamSink, app_dir: String) -> Result<()> { let network = config::network(); anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty"); - stream.add(Event::Init(format!("Initialising {network} wallet"))); - wallet::init_wallet(Path::new(app_dir.as_str()))?; - - stream.add(Event::Init("Initialising database".to_string())); - db::init_db( - &Path::new(app_dir.as_str()) - .join(network.to_string()) - .join("taker.sqlite"), - ) - .await?; - - stream.add(Event::Init("Starting full ldk node".to_string())); - let background_processor = wallet::run_ldk().await?; - - stream.add(Event::Init("Fetching an offer".to_string())); - stream.add(Event::Offer(offer::get_offer().await.ok())); - - stream.add(Event::Init("Fetching your balance".to_string())); - stream.add(Event::WalletInfo( - WalletInfo::build_wallet_info().await.ok(), - )); - stream.add(Event::Init("Checking channel state".to_string())); - stream.add(Event::ChannelState(get_channel_state())); - - stream.add(Event::Init("Ready".to_string())); - stream.add(Event::Ready); - - // spawn a connection task keeping the connection with the maker alive. - let peer_manager = wallet::get_peer_manager()?; - let connection_handle = connection::spawn(peer_manager); - - // sync offers every 5 seconds - let offer_handle = offer::spawn(stream.clone()); - - // sync wallet every 60 seconds - let wallet_sync_handle = tokio::spawn(async { - loop { - wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - } - }); - - // sync wallet info every 10 seconds - let wallet_info_stream = stream.clone(); - let wallet_info_sync_handle = tokio::spawn(async move { - loop { - match WalletInfo::build_wallet_info().await { - Ok(wallet_info) => { - let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); + let runtime = runtime()?; + runtime.block_on(async move { + stream.add(Event::Init(format!("Initialising {network} wallet"))); + wallet::init_wallet(Path::new(app_dir.as_str()))?; + + stream.add(Event::Init("Initialising database".to_string())); + db::init_db( + &Path::new(app_dir.as_str()) + .join(network.to_string()) + .join("taker.sqlite"), + ) + .await?; + + stream.add(Event::Init("Starting full ldk node".to_string())); + let background_processor = wallet::run_ldk()?; + + stream.add(Event::Init("Fetching an offer".to_string())); + stream.add(Event::Offer(offer::get_offer().await.ok())); + + stream.add(Event::Init("Fetching your balance".to_string())); + stream.add(Event::WalletInfo( + WalletInfo::build_wallet_info().await.ok(), + )); + stream.add(Event::Init("Checking channel state".to_string())); + stream.add(Event::ChannelState(get_channel_state())); + + stream.add(Event::Init("Ready".to_string())); + stream.add(Event::Ready); + + // spawn a connection task keeping the connection with the maker alive. + runtime.spawn(async move { + let peer_info = config::maker_peer_info(); + loop { + let peer_manager = wallet::get_peer_manager(); + connection::connect(peer_manager, peer_info).await; + // add a delay before retrying to connect + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); + + let offer_stream = stream.clone(); + runtime.spawn(async move { + loop { + offer_stream.add(Event::Offer(offer::get_offer().await.ok())); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); + + runtime.spawn(async { + loop { + wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + }); + + let wallet_info_stream = stream.clone(); + runtime.spawn(async move { + loop { + match WalletInfo::build_wallet_info().await { + Ok(wallet_info) => { + let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); + } + Err(e) => tracing::error!(?e, "Failed to build wallet info"), } - Err(e) => tracing::error!(?e, "Failed to build wallet info"), + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - } - }); - - // sync channel state every 5 seconds - let channel_state_stream = stream.clone(); - let channel_state_handle = tokio::spawn(async move { - loop { - channel_state_stream.add(Event::ChannelState(get_channel_state())); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }); + }); - try_join!( - connection_handle, - offer_handle, - wallet_sync_handle, - wallet_info_sync_handle, - channel_state_handle, - )?; + let channel_state_stream = stream.clone(); + runtime.spawn(async move { + loop { + channel_state_stream.add(Event::ChannelState(get_channel_state())); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); - background_processor.join().map_err(|e| anyhow!(e)) + runtime.spawn_blocking(move || { + // background processor joins on a sync thread, meaning that join here will block a + // full thread, which is dis-encouraged to do in async code. + if let Err(err) = background_processor.join() { + tracing::error!(?err, "Background processor stopped unexpected"); + } + }); + Ok(()) + }) } pub fn get_balance() -> Result { @@ -279,16 +302,18 @@ pub fn network() -> SyncReturn { SyncReturn(config::network().to_string()) } -#[tokio::main(flavor = "current_thread")] -pub async fn open_channel(taker_amount: u64) -> Result<()> { - let peer_info = config::maker_peer_info(); - wallet::open_channel(peer_info, taker_amount).await +pub fn open_channel(taker_amount: u64) -> Result<()> { + runtime()?.block_on(async { + let peer_info = config::maker_peer_info(); + wallet::open_channel(peer_info, taker_amount).await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn close_channel() -> Result<()> { - let peer_info = config::maker_peer_info(); - wallet::close_channel(peer_info.pubkey, false).await +pub fn close_channel() -> Result<()> { + runtime()?.block_on(async { + let peer_info = config::maker_peer_info(); + wallet::close_channel(peer_info.pubkey, false).await + }) } pub fn send_to_address(address: String, amount: u64) -> Result { @@ -301,42 +326,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result { Ok(txid) } -#[tokio::main(flavor = "current_thread")] -pub async fn list_cfds() -> Result> { - let mut conn = db::acquire().await?; - cfd::load_cfds(&mut conn).await +pub fn list_cfds() -> Result> { + runtime()?.block_on(async { + let mut conn = db::acquire().await?; + cfd::load_cfds(&mut conn).await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn open_cfd(order: Order) -> Result<()> { - cfd::open(&order).await +pub fn open_cfd(order: Order) -> Result<()> { + runtime()?.block_on(async { cfd::open(&order).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn call_faucet(address: String) -> Result { +pub fn call_faucet(address: String) -> Result { anyhow::ensure!( !address.is_empty(), "Cannot call faucet because of empty address" ); - faucet::call_faucet(address).await + runtime()?.block_on(async { faucet::call_faucet(address).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn get_fee_recommendation() -> Result { - let fee_recommendation = wallet::get_fee_recommendation()?; - - Ok(fee_recommendation) +pub fn get_fee_recommendation() -> Result { + wallet::get_fee_recommendation() } /// Settles a CFD with the given taker and maker amounts in sats -#[tokio::main(flavor = "current_thread")] -pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { - cfd::settle(&cfd, &offer).await -} - -#[tokio::main(flavor = "current_thread")] -pub async fn get_lightning_tx_history() -> Result> { - wallet::get_lightning_history().await +pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { + runtime()?.block_on(async { cfd::settle(&cfd, &offer).await }) } /// Initialise logging infrastructure for Rust @@ -350,19 +365,18 @@ pub fn get_seed_phrase() -> Vec { wallet::get_seed_phrase() } -#[tokio::main(flavor = "current_thread")] -pub async fn send_lightning_payment(invoice: String) -> Result<()> { +pub fn send_lightning_payment(invoice: String) -> Result<()> { anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice"); - wallet::send_lightning_payment(&invoice).await + runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn create_lightning_invoice( +pub fn create_lightning_invoice( amount_sats: u64, expiry_secs: u32, description: String, ) -> Result { - wallet::create_invoice(amount_sats, expiry_secs, description).await + runtime()? + .block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await }) } // Note, this implementation has to be on the api level as otherwise it wouldn't be generated diff --git a/rust/src/connection.rs b/rust/src/connection.rs index 38341d55..d523cf24 100644 --- a/rust/src/connection.rs +++ b/rust/src/connection.rs @@ -1,44 +1,36 @@ -use crate::config; +use crate::lightning::PeerInfo; use crate::lightning::PeerManager; use bdk::bitcoin::secp256k1::PublicKey; use std::sync::Arc; use std::time::Duration; -use tokio::task::JoinHandle; -pub fn spawn(peer_manager: Arc) -> JoinHandle<()> { - // keep connection with maker alive! - tokio::spawn(async move { - let peer_info = config::maker_peer_info(); - loop { - tracing::info!("Connecting to {peer_info}"); - match lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - peer_info.pubkey, - peer_info.peer_addr, - ) - .await - { - Some(connection_closed_future) => { - let mut connection_closed_future = Box::pin(connection_closed_future); - while !is_connected(&peer_manager, peer_info.pubkey) { - if futures::poll!(&mut connection_closed_future).is_ready() { - tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds."); - tokio::time::sleep(Duration::from_secs(5)).await; - return; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - tracing::info!("Successfully connected to {peer_info}"); - connection_closed_future.await; - tracing::warn!("Lost connection to maker, retrying immediately.") - } - None => { - tracing::warn!("Failed to connect to maker! Retrying in 5 seconds."); +pub async fn connect(peer_manager: Arc, peer_info: PeerInfo) { + tracing::info!("Connecting to {peer_info}"); + match lightning_net_tokio::connect_outbound( + Arc::clone(&peer_manager), + peer_info.pubkey, + peer_info.peer_addr, + ) + .await + { + Some(connection_closed_future) => { + let mut connection_closed_future = Box::pin(connection_closed_future); + while !is_connected(&peer_manager, peer_info.pubkey) { + if futures::poll!(&mut connection_closed_future).is_ready() { + tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds."); tokio::time::sleep(Duration::from_secs(5)).await; + return; } + tokio::time::sleep(Duration::from_secs(5)).await; } + tracing::info!("Successfully connected to {peer_info}"); + connection_closed_future.await; + tracing::warn!("Lost connection to maker, retrying immediately.") + } + None => { + tracing::warn!("Failed to connect to maker! Retrying."); } - }) + } } fn is_connected(peer_manager: &Arc, pubkey: PublicKey) -> bool { diff --git a/rust/src/lightning.rs b/rust/src/lightning.rs index f482ad75..cc15c487 100644 --- a/rust/src/lightning.rs +++ b/rust/src/lightning.rs @@ -115,7 +115,7 @@ pub struct LightningSystem { pub network: Network, } -#[derive(Serialize)] +#[derive(Serialize, Clone, Copy)] pub struct PeerInfo { pub pubkey: PublicKey, pub peer_addr: SocketAddr, @@ -545,7 +545,7 @@ fn default_user_config() -> UserConfig { } } -pub async fn run_ldk(system: &LightningSystem) -> Result { +pub fn run_ldk(system: &LightningSystem) -> Result { let ldk_data_dir = system.data_dir.to_string_lossy().to_string(); let runtime_handle = tokio::runtime::Handle::current(); @@ -630,7 +630,7 @@ pub async fn run_ldk_server( tracing::info!("Listening on {address}"); - let background_processor = run_ldk(system).await?; + let background_processor = run_ldk(system)?; Ok((tcp_handle, background_processor)) } diff --git a/rust/src/offer.rs b/rust/src/offer.rs index dfd3606f..ce8109b1 100644 --- a/rust/src/offer.rs +++ b/rust/src/offer.rs @@ -1,13 +1,10 @@ -use crate::api::Event; use crate::config::maker_endpoint; use anyhow::anyhow; use anyhow::bail; use anyhow::Result; -use flutter_rust_bridge::StreamSink; use reqwest::StatusCode; use serde::Deserialize; use serde::Serialize; -use tokio::task::JoinHandle; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Offer { @@ -16,16 +13,6 @@ pub struct Offer { pub index: f64, } -pub fn spawn(stream: StreamSink) -> JoinHandle<()> { - tokio::spawn(async move { - loop { - let offer = get_offer().await.ok(); - stream.add(Event::Offer(offer)); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }) -} - pub async fn get_offer() -> Result { let client = reqwest::Client::builder() .timeout(crate::config::TCP_TIMEOUT) diff --git a/rust/src/wallet.rs b/rust/src/wallet.rs index 8cff8ba6..f4da06e7 100644 --- a/rust/src/wallet.rs +++ b/rust/src/wallet.rs @@ -238,8 +238,8 @@ impl Wallet { } /// Run the lightning node - pub async fn run_ldk(&self) -> Result { - lightning::run_ldk(&self.lightning).await + pub fn run_ldk(&self) -> Result { + lightning::run_ldk(&self.lightning) } /// Run the lightning node @@ -418,9 +418,9 @@ pub fn init_wallet(data_dir: &Path) -> Result<()> { Ok(()) } -pub async fn run_ldk() -> Result { +pub fn run_ldk() -> Result { let wallet = get_wallet(); - wallet.run_ldk().await + wallet.run_ldk() } pub async fn run_ldk_server(address: SocketAddr) -> Result<(JoinHandle<()>, BackgroundProcessor)> { @@ -485,8 +485,8 @@ pub fn get_seed_phrase() -> Vec { get_wallet().seed.get_seed_phrase() } -pub fn get_peer_manager() -> Result> { - Ok(get_wallet().lightning.peer_manager.clone()) +pub fn get_peer_manager() -> Arc { + get_wallet().lightning.peer_manager.clone() } pub async fn send_lightning_payment(invoice: &str) -> Result<()> { From 0d50521238d82495a00051c406ad9181d19fe51c Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 12 Dec 2022 16:41:09 +0100 Subject: [PATCH 2/2] feat: Restart failed background processor Adds and handling in case the background processor stops, by simply trying to start the background processor again. Some more refactoring could be probably done there, but I tried to keep the changes to a minimum. --- rust/src/api.rs | 13 +++++++---- rust/src/lightning.rs | 51 ++++++++++++++++++++++++------------------- rust/src/wallet.rs | 7 ++++-- 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/rust/src/api.rs b/rust/src/api.rs index 2463ecad..e7fce44a 100644 --- a/rust/src/api.rs +++ b/rust/src/api.rs @@ -268,10 +268,15 @@ pub fn run(stream: StreamSink, app_dir: String) -> Result<()> { }); runtime.spawn_blocking(move || { - // background processor joins on a sync thread, meaning that join here will block a - // full thread, which is dis-encouraged to do in async code. - if let Err(err) = background_processor.join() { - tracing::error!(?err, "Background processor stopped unexpected"); + let mut background_processor = background_processor; + loop { + // background processor joins on a sync thread, meaning that join here will block a + // full thread, which is dis-encouraged to do in async code. + if let Err(err) = background_processor.join() { + tracing::warn!(?err, "Background processor stopped unexpected"); + } + tracing::info!("Restarting lightning node"); + background_processor = wallet::start_background_processor(); } }); Ok(()) diff --git a/rust/src/lightning.rs b/rust/src/lightning.rs index cc15c487..7121d3be 100644 --- a/rust/src/lightning.rs +++ b/rust/src/lightning.rs @@ -112,6 +112,7 @@ pub struct LightningSystem { pub persister: Arc, pub gossip_sync: Arc, pub data_dir: PathBuf, + pub scorer: Arc>, pub network: Network, } @@ -144,6 +145,22 @@ impl LightningSystem { peers: self.peer_manager.get_peer_node_ids(), } } + + pub fn start_background_processor(&self) -> BackgroundProcessor { + let invoice_payer = INVOICE_PAYER + .try_get() + .expect("invoice payer to be initialized"); + BackgroundProcessor::start( + self.persister.clone(), + invoice_payer.clone(), + self.chain_monitor.clone(), + self.channel_manager.clone(), + GossipSync::p2p(self.gossip_sync.clone()), + self.peer_manager.clone(), + self.logger.clone(), + Some(self.scorer.clone()), + ) + } } pub async fn open_channel( @@ -510,6 +527,14 @@ pub fn setup( IgnoringMessageHandler {}, )); + // Step 17: Initialize routing ProbabilisticScorer + let scorer_path = format!("{ldk_data_dir}/scorer"); + let scorer = Arc::new(Mutex::new(disk::read_scorer( + Path::new(&scorer_path), + Arc::clone(&network_graph), + Arc::clone(&logger), + ))); + let system = LightningSystem { wallet: lightning_wallet, chain_monitor, @@ -521,6 +546,7 @@ pub fn setup( persister, gossip_sync, data_dir: Path::new(&ldk_data_dir).to_path_buf(), + scorer, network, }; @@ -546,8 +572,6 @@ fn default_user_config() -> UserConfig { } pub fn run_ldk(system: &LightningSystem) -> Result { - let ldk_data_dir = system.data_dir.to_string_lossy().to_string(); - let runtime_handle = tokio::runtime::Handle::current(); let event_handler = BdkLdkEventHandler { runtime_handle, @@ -558,20 +582,12 @@ pub fn run_ldk(system: &LightningSystem) -> Result { network: system.network, }; - // Step 17: Initialize routing ProbabilisticScorer - let scorer_path = format!("{ldk_data_dir}/scorer"); - let scorer = Arc::new(Mutex::new(disk::read_scorer( - Path::new(&scorer_path), - Arc::clone(&system.network_graph), - Arc::clone(&system.logger), - ))); - // Step 18: Create InvoicePayer let router = DefaultRouter::new( system.network_graph.clone(), system.logger.clone(), system.keys_manager.get_secure_random_bytes(), - scorer.clone(), + system.scorer.clone(), ); let invoice_payer = Arc::new(BdkLdkInvoicePayer::new( system.channel_manager.clone(), @@ -583,20 +599,9 @@ pub fn run_ldk(system: &LightningSystem) -> Result { INVOICE_PAYER.set(invoice_payer.clone()); - // Step 19: Background Processing - let background_processor = BackgroundProcessor::start( - system.persister.clone(), - invoice_payer.clone(), - system.chain_monitor.clone(), - system.channel_manager.clone(), - GossipSync::p2p(system.gossip_sync.clone()), - system.peer_manager.clone(), - system.logger.clone(), - Some(scorer), - ); + let background_processor = system.start_background_processor(); let node_id = system.channel_manager.get_our_node_id(); - tracing::info!("Lightning node started with node ID {node_id}"); Ok(background_processor) } diff --git a/rust/src/wallet.rs b/rust/src/wallet.rs index f4da06e7..98299d12 100644 --- a/rust/src/wallet.rs +++ b/rust/src/wallet.rs @@ -489,10 +489,13 @@ pub fn get_peer_manager() -> Arc { get_wallet().lightning.peer_manager.clone() } +pub fn start_background_processor() -> BackgroundProcessor { + get_wallet().lightning.start_background_processor() +} + pub async fn send_lightning_payment(invoice: &str) -> Result<()> { let invoice = Invoice::from_str(invoice).context("Could not parse Invoice string")?; - lightning::send_payment(&invoice).await?; - Ok(()) + lightning::send_payment(&invoice).await } #[derive(Serialize, Deserialize, Debug)]