This repository has been archived by the owner on Aug 31, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Restart failed background processor #572
Open
holzeis
wants to merge
2
commits into
chore/use-dedicated-runtime
Choose a base branch
from
feat/restart-failed-background-processor
base: chore/use-dedicated-runtime
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,6 @@ use crate::offer::Offer; | |
use crate::wallet; | ||
use crate::wallet::Balance; | ||
use crate::wallet::LightningTransaction; | ||
use anyhow::anyhow; | ||
use anyhow::Context; | ||
use anyhow::Result; | ||
use flutter_rust_bridge::StreamSink; | ||
|
@@ -22,13 +21,14 @@ use lightning_invoice::Invoice; | |
use lightning_invoice::InvoiceDescription; | ||
use rust_decimal::prelude::ToPrimitive; | ||
use rust_decimal::Decimal; | ||
use state::Storage; | ||
use std::ops::Add; | ||
use std::path::Path; | ||
use std::str::FromStr; | ||
use std::time::SystemTime; | ||
use time::Duration; | ||
pub use time::OffsetDateTime; | ||
use tokio::try_join; | ||
use tokio::runtime::Runtime; | ||
|
||
pub struct Address { | ||
pub address: String, | ||
|
@@ -168,91 +168,119 @@ impl WalletInfo { | |
Ok(tx_history) | ||
} | ||
} | ||
/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to | ||
/// the number of available cores. | ||
fn runtime() -> Result<&'static Runtime> { | ||
static RUNTIME: Storage<Runtime> = Storage::new(); | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn refresh_wallet_info() -> Result<WalletInfo> { | ||
wallet::sync()?; | ||
WalletInfo::build_wallet_info().await | ||
if RUNTIME.try_get().is_none() { | ||
let runtime = Runtime::new()?; | ||
RUNTIME.set(runtime); | ||
} | ||
|
||
Ok(RUNTIME.get()) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> { | ||
pub fn refresh_wallet_info() -> Result<WalletInfo> { | ||
runtime()?.block_on(async { | ||
wallet::sync()?; | ||
WalletInfo::build_wallet_info().await | ||
}) | ||
} | ||
|
||
pub fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> { | ||
let network = config::network(); | ||
anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty"); | ||
stream.add(Event::Init(format!("Initialising {network} wallet"))); | ||
wallet::init_wallet(Path::new(app_dir.as_str()))?; | ||
|
||
stream.add(Event::Init("Initialising database".to_string())); | ||
db::init_db( | ||
&Path::new(app_dir.as_str()) | ||
.join(network.to_string()) | ||
.join("taker.sqlite"), | ||
) | ||
.await?; | ||
|
||
stream.add(Event::Init("Starting full ldk node".to_string())); | ||
let background_processor = wallet::run_ldk().await?; | ||
|
||
stream.add(Event::Init("Fetching an offer".to_string())); | ||
stream.add(Event::Offer(offer::get_offer().await.ok())); | ||
|
||
stream.add(Event::Init("Fetching your balance".to_string())); | ||
stream.add(Event::WalletInfo( | ||
WalletInfo::build_wallet_info().await.ok(), | ||
)); | ||
stream.add(Event::Init("Checking channel state".to_string())); | ||
stream.add(Event::ChannelState(get_channel_state())); | ||
|
||
stream.add(Event::Init("Ready".to_string())); | ||
stream.add(Event::Ready); | ||
|
||
// spawn a connection task keeping the connection with the maker alive. | ||
let peer_manager = wallet::get_peer_manager()?; | ||
let connection_handle = connection::spawn(peer_manager); | ||
|
||
// sync offers every 5 seconds | ||
let offer_handle = offer::spawn(stream.clone()); | ||
|
||
// sync wallet every 60 seconds | ||
let wallet_sync_handle = tokio::spawn(async { | ||
loop { | ||
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); | ||
tokio::time::sleep(std::time::Duration::from_secs(60)).await; | ||
} | ||
}); | ||
|
||
// sync wallet info every 10 seconds | ||
let wallet_info_stream = stream.clone(); | ||
let wallet_info_sync_handle = tokio::spawn(async move { | ||
loop { | ||
match WalletInfo::build_wallet_info().await { | ||
Ok(wallet_info) => { | ||
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); | ||
} | ||
Err(e) => tracing::error!(?e, "Failed to build wallet info"), | ||
let runtime = runtime()?; | ||
runtime.block_on(async move { | ||
stream.add(Event::Init(format!("Initialising {network} wallet"))); | ||
wallet::init_wallet(Path::new(app_dir.as_str()))?; | ||
|
||
stream.add(Event::Init("Initialising database".to_string())); | ||
db::init_db( | ||
&Path::new(app_dir.as_str()) | ||
.join(network.to_string()) | ||
.join("taker.sqlite"), | ||
) | ||
.await?; | ||
|
||
stream.add(Event::Init("Starting full ldk node".to_string())); | ||
let background_processor = wallet::run_ldk()?; | ||
|
||
stream.add(Event::Init("Fetching an offer".to_string())); | ||
stream.add(Event::Offer(offer::get_offer().await.ok())); | ||
|
||
stream.add(Event::Init("Fetching your balance".to_string())); | ||
stream.add(Event::WalletInfo( | ||
WalletInfo::build_wallet_info().await.ok(), | ||
)); | ||
stream.add(Event::Init("Checking channel state".to_string())); | ||
stream.add(Event::ChannelState(get_channel_state())); | ||
|
||
stream.add(Event::Init("Ready".to_string())); | ||
stream.add(Event::Ready); | ||
|
||
// spawn a connection task keeping the connection with the maker alive. | ||
runtime.spawn(async move { | ||
let peer_info = config::maker_peer_info(); | ||
loop { | ||
let peer_manager = wallet::get_peer_manager(); | ||
connection::connect(peer_manager, peer_info).await; | ||
// add a delay before retrying to connect | ||
tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
} | ||
tokio::time::sleep(std::time::Duration::from_secs(10)).await; | ||
} | ||
}); | ||
|
||
// sync channel state every 5 seconds | ||
let channel_state_stream = stream.clone(); | ||
let channel_state_handle = tokio::spawn(async move { | ||
loop { | ||
channel_state_stream.add(Event::ChannelState(get_channel_state())); | ||
tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
} | ||
}); | ||
}); | ||
|
||
try_join!( | ||
connection_handle, | ||
offer_handle, | ||
wallet_sync_handle, | ||
wallet_info_sync_handle, | ||
channel_state_handle, | ||
)?; | ||
let offer_stream = stream.clone(); | ||
runtime.spawn(async move { | ||
loop { | ||
offer_stream.add(Event::Offer(offer::get_offer().await.ok())); | ||
tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
} | ||
}); | ||
|
||
background_processor.join().map_err(|e| anyhow!(e)) | ||
runtime.spawn(async { | ||
loop { | ||
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); | ||
tokio::time::sleep(std::time::Duration::from_secs(60)).await; | ||
} | ||
}); | ||
|
||
let wallet_info_stream = stream.clone(); | ||
runtime.spawn(async move { | ||
loop { | ||
match WalletInfo::build_wallet_info().await { | ||
Ok(wallet_info) => { | ||
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); | ||
} | ||
Err(e) => tracing::error!(?e, "Failed to build wallet info"), | ||
} | ||
tokio::time::sleep(std::time::Duration::from_secs(10)).await; | ||
} | ||
}); | ||
|
||
let channel_state_stream = stream.clone(); | ||
runtime.spawn(async move { | ||
loop { | ||
channel_state_stream.add(Event::ChannelState(get_channel_state())); | ||
tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
} | ||
}); | ||
|
||
runtime.spawn_blocking(move || { | ||
let mut background_processor = background_processor; | ||
loop { | ||
// background processor joins on a sync thread, meaning that join here will block a | ||
// full thread, which is dis-encouraged to do in async code. | ||
if let Err(err) = background_processor.join() { | ||
tracing::warn!(?err, "Background processor stopped unexpected"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔧 It should be "unexpectedly"! |
||
} | ||
tracing::info!("Restarting lightning node"); | ||
background_processor = wallet::start_background_processor(); | ||
} | ||
}); | ||
Ok(()) | ||
}) | ||
} | ||
|
||
pub fn get_balance() -> Result<Balance> { | ||
|
@@ -279,16 +307,18 @@ pub fn network() -> SyncReturn<String> { | |
SyncReturn(config::network().to_string()) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn open_channel(taker_amount: u64) -> Result<()> { | ||
let peer_info = config::maker_peer_info(); | ||
wallet::open_channel(peer_info, taker_amount).await | ||
pub fn open_channel(taker_amount: u64) -> Result<()> { | ||
runtime()?.block_on(async { | ||
let peer_info = config::maker_peer_info(); | ||
wallet::open_channel(peer_info, taker_amount).await | ||
}) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn close_channel() -> Result<()> { | ||
let peer_info = config::maker_peer_info(); | ||
wallet::close_channel(peer_info.pubkey, false).await | ||
pub fn close_channel() -> Result<()> { | ||
runtime()?.block_on(async { | ||
let peer_info = config::maker_peer_info(); | ||
wallet::close_channel(peer_info.pubkey, false).await | ||
}) | ||
} | ||
|
||
pub fn send_to_address(address: String, amount: u64) -> Result<String> { | ||
|
@@ -301,42 +331,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result<String> { | |
Ok(txid) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn list_cfds() -> Result<Vec<Cfd>> { | ||
let mut conn = db::acquire().await?; | ||
cfd::load_cfds(&mut conn).await | ||
pub fn list_cfds() -> Result<Vec<Cfd>> { | ||
runtime()?.block_on(async { | ||
let mut conn = db::acquire().await?; | ||
cfd::load_cfds(&mut conn).await | ||
}) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn open_cfd(order: Order) -> Result<()> { | ||
cfd::open(&order).await | ||
pub fn open_cfd(order: Order) -> Result<()> { | ||
runtime()?.block_on(async { cfd::open(&order).await }) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn call_faucet(address: String) -> Result<String> { | ||
pub fn call_faucet(address: String) -> Result<String> { | ||
anyhow::ensure!( | ||
!address.is_empty(), | ||
"Cannot call faucet because of empty address" | ||
); | ||
faucet::call_faucet(address).await | ||
runtime()?.block_on(async { faucet::call_faucet(address).await }) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn get_fee_recommendation() -> Result<u32> { | ||
let fee_recommendation = wallet::get_fee_recommendation()?; | ||
|
||
Ok(fee_recommendation) | ||
pub fn get_fee_recommendation() -> Result<u32> { | ||
wallet::get_fee_recommendation() | ||
} | ||
|
||
/// Settles a CFD with the given taker and maker amounts in sats | ||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { | ||
cfd::settle(&cfd, &offer).await | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn get_lightning_tx_history() -> Result<Vec<LightningTransaction>> { | ||
wallet::get_lightning_history().await | ||
pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { | ||
runtime()?.block_on(async { cfd::settle(&cfd, &offer).await }) | ||
} | ||
|
||
/// Initialise logging infrastructure for Rust | ||
|
@@ -350,19 +370,18 @@ pub fn get_seed_phrase() -> Vec<String> { | |
wallet::get_seed_phrase() | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn send_lightning_payment(invoice: String) -> Result<()> { | ||
pub fn send_lightning_payment(invoice: String) -> Result<()> { | ||
anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice"); | ||
wallet::send_lightning_payment(&invoice).await | ||
runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await }) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn create_lightning_invoice( | ||
pub fn create_lightning_invoice( | ||
amount_sats: u64, | ||
expiry_secs: u32, | ||
description: String, | ||
) -> Result<String> { | ||
wallet::create_invoice(amount_sats, expiry_secs, description).await | ||
runtime()? | ||
.block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await }) | ||
} | ||
|
||
// Note, this implementation has to be on the api level as otherwise it wouldn't be generated | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 It should be "discouraged"!