Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

feat: Restart failed background processor #572

Open
wants to merge 2 commits into
base: chore/use-dedicated-runtime
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 135 additions & 116 deletions rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::offer::Offer;
use crate::wallet;
use crate::wallet::Balance;
use crate::wallet::LightningTransaction;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use flutter_rust_bridge::StreamSink;
Expand All @@ -22,13 +21,14 @@ use lightning_invoice::Invoice;
use lightning_invoice::InvoiceDescription;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use state::Storage;
use std::ops::Add;
use std::path::Path;
use std::str::FromStr;
use std::time::SystemTime;
use time::Duration;
pub use time::OffsetDateTime;
use tokio::try_join;
use tokio::runtime::Runtime;

pub struct Address {
pub address: String,
Expand Down Expand Up @@ -168,91 +168,119 @@ impl WalletInfo {
Ok(tx_history)
}
}
/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to
/// the number of available cores.
fn runtime() -> Result<&'static Runtime> {
static RUNTIME: Storage<Runtime> = Storage::new();

#[tokio::main(flavor = "current_thread")]
pub async fn refresh_wallet_info() -> Result<WalletInfo> {
wallet::sync()?;
WalletInfo::build_wallet_info().await
if RUNTIME.try_get().is_none() {
let runtime = Runtime::new()?;
RUNTIME.set(runtime);
}

Ok(RUNTIME.get())
}

#[tokio::main(flavor = "current_thread")]
pub async fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
pub fn refresh_wallet_info() -> Result<WalletInfo> {
runtime()?.block_on(async {
wallet::sync()?;
WalletInfo::build_wallet_info().await
})
}

pub fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
let network = config::network();
anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty");
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk().await?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);

// spawn a connection task keeping the connection with the maker alive.
let peer_manager = wallet::get_peer_manager()?;
let connection_handle = connection::spawn(peer_manager);

// sync offers every 5 seconds
let offer_handle = offer::spawn(stream.clone());

// sync wallet every 60 seconds
let wallet_sync_handle = tokio::spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

// sync wallet info every 10 seconds
let wallet_info_stream = stream.clone();
let wallet_info_sync_handle = tokio::spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
let runtime = runtime()?;
runtime.block_on(async move {
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk()?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);

// spawn a connection task keeping the connection with the maker alive.
runtime.spawn(async move {
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
connection::connect(peer_manager, peer_info).await;
// add a delay before retrying to connect
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});

// sync channel state every 5 seconds
let channel_state_stream = stream.clone();
let channel_state_handle = tokio::spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
});

try_join!(
connection_handle,
offer_handle,
wallet_sync_handle,
wallet_info_sync_handle,
channel_state_handle,
)?;
let offer_stream = stream.clone();
runtime.spawn(async move {
loop {
offer_stream.add(Event::Offer(offer::get_offer().await.ok()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

background_processor.join().map_err(|e| anyhow!(e))
runtime.spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

let wallet_info_stream = stream.clone();
runtime.spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});

let channel_state_stream = stream.clone();
runtime.spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

runtime.spawn_blocking(move || {
let mut background_processor = background_processor;
loop {
// background processor joins on a sync thread, meaning that join here will block a
// full thread, which is dis-encouraged to do in async code.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 It should be "discouraged"!

if let Err(err) = background_processor.join() {
tracing::warn!(?err, "Background processor stopped unexpected");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 It should be "unexpectedly"!

}
tracing::info!("Restarting lightning node");
background_processor = wallet::start_background_processor();
}
});
Ok(())
})
}

pub fn get_balance() -> Result<Balance> {
Expand All @@ -279,16 +307,18 @@ pub fn network() -> SyncReturn<String> {
SyncReturn(config::network().to_string())
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_channel(taker_amount: u64) -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
pub fn open_channel(taker_amount: u64) -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn close_channel() -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
pub fn close_channel() -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
})
}

pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Expand All @@ -301,42 +331,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Ok(txid)
}

#[tokio::main(flavor = "current_thread")]
pub async fn list_cfds() -> Result<Vec<Cfd>> {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
pub fn list_cfds() -> Result<Vec<Cfd>> {
runtime()?.block_on(async {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_cfd(order: Order) -> Result<()> {
cfd::open(&order).await
pub fn open_cfd(order: Order) -> Result<()> {
runtime()?.block_on(async { cfd::open(&order).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn call_faucet(address: String) -> Result<String> {
pub fn call_faucet(address: String) -> Result<String> {
anyhow::ensure!(
!address.is_empty(),
"Cannot call faucet because of empty address"
);
faucet::call_faucet(address).await
runtime()?.block_on(async { faucet::call_faucet(address).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_fee_recommendation() -> Result<u32> {
let fee_recommendation = wallet::get_fee_recommendation()?;

Ok(fee_recommendation)
pub fn get_fee_recommendation() -> Result<u32> {
wallet::get_fee_recommendation()
}

/// Settles a CFD with the given taker and maker amounts in sats
#[tokio::main(flavor = "current_thread")]
pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
cfd::settle(&cfd, &offer).await
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_lightning_tx_history() -> Result<Vec<LightningTransaction>> {
wallet::get_lightning_history().await
pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
runtime()?.block_on(async { cfd::settle(&cfd, &offer).await })
}

/// Initialise logging infrastructure for Rust
Expand All @@ -350,19 +370,18 @@ pub fn get_seed_phrase() -> Vec<String> {
wallet::get_seed_phrase()
}

#[tokio::main(flavor = "current_thread")]
pub async fn send_lightning_payment(invoice: String) -> Result<()> {
pub fn send_lightning_payment(invoice: String) -> Result<()> {
anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice");
wallet::send_lightning_payment(&invoice).await
runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn create_lightning_invoice(
pub fn create_lightning_invoice(
amount_sats: u64,
expiry_secs: u32,
description: String,
) -> Result<String> {
wallet::create_invoice(amount_sats, expiry_secs, description).await
runtime()?
.block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await })
}

// Note, this implementation has to be on the api level as otherwise it wouldn't be generated
Expand Down
56 changes: 24 additions & 32 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
use crate::config;
use crate::lightning::PeerInfo;
use crate::lightning::PeerManager;
use bdk::bitcoin::secp256k1::PublicKey;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;

pub fn spawn(peer_manager: Arc<PeerManager>) -> JoinHandle<()> {
// keep connection with maker alive!
tokio::spawn(async move {
let peer_info = config::maker_peer_info();
loop {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying in 5 seconds.");
pub async fn connect(peer_manager: Arc<PeerManager>, peer_info: PeerInfo) {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying.");
}
})
}
}

fn is_connected(peer_manager: &Arc<PeerManager>, pubkey: PublicKey) -> bool {
Expand Down
Loading