From b9ba8fb90bef54d53472d29d6d1195dca0349fa8 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Fri, 12 Jul 2024 13:56:50 +0300 Subject: [PATCH] f --- src/builder.rs | 2 - src/event.rs | 3 + src/lib.rs | 21 +- src/payment/payjoin/handler.rs | 391 ++++++++---------- src/payment/payjoin/mod.rs | 96 +++-- src/tx_broadcaster.rs | 4 +- tests/common/mod.rs | 17 + tests/integration_tests_payjoin.rs | 31 +- ...tion_tests_payjoin_with_channel_opening.rs | 8 +- 9 files changed, 311 insertions(+), 262 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index de812d619..ce456d4c2 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1017,8 +1017,6 @@ fn build_with_store_internal( let mut payjoin_receiver = None; if let Some(pj_config) = payjoin_config { payjoin_handler = Some(Arc::new(PayjoinHandler::new( - Arc::clone(&tx_broadcaster), - Arc::clone(&logger), pj_config.payjoin_relay.clone(), Arc::clone(&tx_sync), Arc::clone(&event_queue), diff --git a/src/event.rs b/src/event.rs index 06713baa0..bb04d55f8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -188,6 +188,7 @@ pub enum PayjoinPaymentFailureReason { Timeout, TransactionFinalisationFailed, InvalidReceiverResponse, + RequestFailed, } impl Readable for PayjoinPaymentFailureReason { @@ -196,6 +197,7 @@ impl Readable for PayjoinPaymentFailureReason { 0 => Ok(Self::Timeout), 1 => Ok(Self::TransactionFinalisationFailed), 2 => Ok(Self::InvalidReceiverResponse), + 3 => Ok(Self::RequestFailed), _ => Err(DecodeError::InvalidValue), } } @@ -207,6 +209,7 @@ impl Writeable for PayjoinPaymentFailureReason { Self::Timeout => 0u8.write(writer), Self::TransactionFinalisationFailed => 1u8.write(writer), Self::InvalidReceiverResponse => 2u8.write(writer), + Self::RequestFailed => 3u8.write(writer), } } } diff --git a/src/lib.rs b/src/lib.rs index be1b6987b..71d2f0953 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -374,6 +374,10 @@ impl Node { let archive_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); + let sync_payjoin = match &self.payjoin_handler { + Some(pjh) => Some(Arc::clone(pjh)), + None => None, + }; let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); let sync_monitor_archival_height = Arc::clone(&self.latest_channel_monitor_archival_height); let mut stop_sync = self.stop_sender.subscribe(); @@ -393,11 +397,14 @@ impl Node { return; } _ = wallet_sync_interval.tick() => { - let confirmables = vec![ + let mut confirmables = vec![ &*sync_cman as &(dyn Confirm + Sync + Send), &*sync_cmon as &(dyn Confirm + Sync + Send), &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; + if let Some(sync_payjoin) = sync_payjoin.as_ref() { + confirmables.push(sync_payjoin.as_ref() as &(dyn Confirm + Sync + Send)); + } let now = Instant::now(); let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); match timeout_fut.await { @@ -1114,9 +1121,11 @@ impl Node { Arc::clone(&self.config), Arc::clone(&self.logger), Arc::clone(&self.wallet), + Arc::clone(&self.tx_broadcaster), Arc::clone(&self.peer_store), Arc::clone(&self.channel_manager), Arc::clone(&self.connection_manager), + Arc::clone(&self.payment_store), ) } @@ -1132,14 +1141,16 @@ impl Node { let payjoin_receiver = self.payjoin_receiver.as_ref(); Arc::new(PayjoinPayment::new( Arc::clone(&self.runtime), - payjoin_sender.map(Arc::clone), + payjoin_handler.map(Arc::clone), payjoin_receiver.map(Arc::clone), Arc::clone(&self.config), Arc::clone(&self.logger), Arc::clone(&self.wallet), + Arc::clone(&self.tx_broadcaster), Arc::clone(&self.peer_store), Arc::clone(&self.channel_manager), Arc::clone(&self.connection_manager), + Arc::clone(&self.payment_store), )) } @@ -1344,11 +1355,15 @@ impl Node { let fee_estimator = Arc::clone(&self.fee_estimator); let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); - let confirmables = vec![ + let sync_payjoin = &self.payjoin_handler.as_ref(); + let mut confirmables = vec![ &*sync_cman as &(dyn Confirm + Sync + Send), &*sync_cmon as &(dyn Confirm + Sync + Send), &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; + if let Some(sync_payjoin) = sync_payjoin { + confirmables.push(sync_payjoin.as_ref() as &(dyn Confirm + Sync + Send)); + } let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); let sync_fee_rate_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); diff --git a/src/payment/payjoin/handler.rs b/src/payment/payjoin/handler.rs index 3f1f96b7f..d9956ad12 100644 --- a/src/payment/payjoin/handler.rs +++ b/src/payment/payjoin/handler.rs @@ -1,31 +1,80 @@ +use lightning::ln::channelmanager::PaymentId; + use crate::config::PAYJOIN_REQUEST_TIMEOUT; use crate::error::Error; use crate::event::PayjoinPaymentFailureReason; -use crate::logger::FilesystemLogger; -use crate::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; -use crate::types::{Broadcaster, ChainSource, EventQueue, PaymentStore, Wallet}; +use crate::io::utils::ohttp_headers; +use crate::payment::store::PaymentDetailsUpdate; +use crate::payment::PaymentStatus; +use crate::types::{ChainSource, EventQueue, PaymentStore, Wallet}; use crate::Event; use bitcoin::address::NetworkChecked; use bitcoin::block::Header; use bitcoin::psbt::Psbt; use bitcoin::{Address, Amount, BlockHash, Script, Transaction, Txid}; - -use lightning::chain::chaininterface::BroadcasterInterface; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::TransactionData; -use lightning::chain::{Confirm, Filter, WatchedOutput}; -use lightning::ln::channelmanager::PaymentId; -use lightning::log_error; -use lightning::util::logger::Logger; -use payjoin::send::{ContextV2, RequestBuilder}; -use rand::RngCore; +use lightning::chain::{Filter, WatchedOutput}; use std::sync::{Arc, RwLock}; +#[derive(Clone, Debug)] +enum PayjoinTransaction { + PendingFirstConfirmation { + original_psbt: Psbt, + tx: Transaction, + receiver: Address, + amount: Amount, + }, + PendingThresholdConfirmations { + original_psbt: Psbt, + tx: Transaction, + receiver: Address, + amount: Amount, + first_confirmation_height: u32, + first_confirmation_hash: BlockHash, + }, +} + +impl PayjoinTransaction { + fn txid(&self) -> Option { + match self { + PayjoinTransaction::PendingFirstConfirmation { tx, .. } => Some(tx.txid()), + PayjoinTransaction::PendingThresholdConfirmations { tx, .. } => Some(tx.txid()), + } + } + fn original_psbt(&self) -> &Psbt { + match self { + PayjoinTransaction::PendingFirstConfirmation { original_psbt, .. } => original_psbt, + PayjoinTransaction::PendingThresholdConfirmations { original_psbt, .. } => { + original_psbt + }, + } + } + fn first_confirmation_height(&self) -> Option { + match self { + PayjoinTransaction::PendingFirstConfirmation { .. } => None, + PayjoinTransaction::PendingThresholdConfirmations { + first_confirmation_height, .. + } => Some(*first_confirmation_height), + } + } + fn amount(&self) -> Amount { + match self { + PayjoinTransaction::PendingFirstConfirmation { amount, .. } => *amount, + PayjoinTransaction::PendingThresholdConfirmations { amount, .. } => *amount, + } + } + fn receiver(&self) -> Address { + match self { + PayjoinTransaction::PendingFirstConfirmation { receiver, .. } => receiver.clone(), + PayjoinTransaction::PendingThresholdConfirmations { receiver, .. } => receiver.clone(), + } + } +} + pub(crate) struct PayjoinHandler { - tx_broadcaster: Arc, - logger: Arc, payjoin_relay: payjoin::Url, chain_source: Arc, transactions: RwLock>, @@ -36,13 +85,10 @@ pub(crate) struct PayjoinHandler { impl PayjoinHandler { pub(crate) fn new( - tx_broadcaster: Arc, logger: Arc, payjoin_relay: payjoin::Url, chain_source: Arc, event_queue: Arc, wallet: Arc, payment_store: Arc, ) -> Self { Self { - tx_broadcaster, - logger, payjoin_relay, transactions: RwLock::new(Vec::new()), chain_source, @@ -52,142 +98,121 @@ impl PayjoinHandler { } } - pub(crate) async fn send_payjoin_transaction( - &self, original_psbt: &mut Psbt, payjoin_uri: payjoin::Uri<'_, NetworkChecked>, - ) -> Result, Error> { - let (request, context) = - RequestBuilder::from_psbt_and_uri(original_psbt.clone(), payjoin_uri.clone()) - .and_then(|b| b.build_non_incentivizing()) - .and_then(|mut c| c.extract_v2(self.payjoin_relay.clone())) - .map_err(|_e| Error::PayjoinRequestCreationFailed) - .unwrap(); - let mut random_bytes = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut random_bytes); - self.payment_store.insert(PaymentDetails::new( - PaymentId(random_bytes), - PaymentKind::Payjoin, - payjoin_uri.amount.map(|a| a.to_sat()), - PaymentDirection::Outbound, - PaymentStatus::Pending, - ))?; - let response = send_payjoin_ohttp_request(&request).await?; - self.handle_payjoin_transaction_response(response, context, original_psbt, payjoin_uri) - .await + pub(crate) async fn send_request( + &self, payjoin_uri: payjoin::Uri<'_, NetworkChecked>, original_psbt: &mut Psbt, + ) -> Result, Error> { + let (request, context) = payjoin::send::RequestBuilder::from_psbt_and_uri( + original_psbt.clone(), + payjoin_uri.clone(), + ) + .and_then(|b| b.build_non_incentivizing()) + .and_then(|mut c| c.extract_v2(self.payjoin_relay.clone())) + .map_err(|_e| Error::PayjoinRequestCreationFailed)?; + let response = reqwest::Client::new() + .post(request.url.clone()) + .body(request.body.clone()) + .timeout(PAYJOIN_REQUEST_TIMEOUT) + .headers(ohttp_headers()) + .send() + .await?; + let response = response.error_for_status()?; + let response = response.bytes().await?; + let response = response.to_vec(); + context + .process_response(&mut response.as_slice()) + .map_err(|_e| Error::PayjoinResponseProcessingFailed) } - pub(crate) async fn handle_payjoin_transaction_response( - &self, response: Vec, context: ContextV2, original_psbt: &mut Psbt, - payjoin_uri: payjoin::Uri<'_, NetworkChecked>, - ) -> Result, Error> { - let amount = match payjoin_uri.amount { - Some(amt) => amt.to_sat(), - None => return Err(Error::PayjoinRequestMissingAmount), - }; - match context.process_response(&mut response.as_slice()) { - Ok(Some(pj_proposal)) => { - let pj_proposal = &mut pj_proposal.clone(); - let tx = - self.finalise_tx(pj_proposal, &mut original_psbt.clone(), payjoin_uri.clone())?; - self.tx_broadcaster.broadcast_transactions(&[&tx]); - let txid = tx.txid(); - let _ = self.event_queue.add_event(Event::PayjoinPaymentPending { - txid, - amount, - receipient: payjoin_uri.address.clone().into(), - }); - Ok(Some(txid)) - }, - Ok(None) => Ok(None), - Err(_e) => { - let _ = self.event_queue.add_event(Event::PayjoinPaymentFailed { - txid: None, - amount, - receipient: payjoin_uri.address.clone().into(), - reason: PayjoinPaymentFailureReason::InvalidReceiverResponse, - }); - return Err(Error::PayjoinResponseProcessingFailed); - }, - } + pub(crate) fn handle_request_failure( + &self, payjoin_uri: payjoin::Uri, original_psbt: &Psbt, + ) -> Result<(), Error> { + self.event_queue.add_event(Event::PayjoinPaymentFailed { + txid: Some(original_psbt.unsigned_tx.txid()), + receipient: payjoin_uri.address.clone().into(), + amount: payjoin_uri.amount.unwrap().to_sat(), + reason: PayjoinPaymentFailureReason::RequestFailed, + }) + } + + pub(crate) fn handle_request_timeout( + &self, payjoin_uri: payjoin::Uri, original_psbt: &Psbt, + ) -> Result<(), Error> { + self.event_queue.add_event(Event::PayjoinPaymentFailed { + txid: Some(original_psbt.unsigned_tx.txid()), + receipient: payjoin_uri.address.clone().into(), + amount: payjoin_uri.amount.unwrap().to_sat(), + reason: PayjoinPaymentFailureReason::Timeout, + }) } - fn finalise_tx( + pub(crate) fn process_response( &self, payjoin_proposal: &mut Psbt, original_psbt: &mut Psbt, payjoin_uri: payjoin::Uri, ) -> Result { let wallet = self.wallet.clone(); wallet.sign_payjoin_proposal(payjoin_proposal, original_psbt)?; let tx = payjoin_proposal.clone().extract_tx(); - if let Some(our_output) = - tx.output.iter().find(|output| wallet.is_mine(&output.script_pubkey).unwrap_or(false)) - { - let mut transactions = self.transactions.write().unwrap(); - let pj_tx = PayjoinTransaction::new( - tx.clone(), - payjoin_uri.address, - payjoin_uri.amount.unwrap_or_default(), - ); - transactions.push(pj_tx); - self.register_tx(&tx.txid(), &our_output.script_pubkey); + let our_input = + tx.output.iter().find(|output| wallet.is_mine(&output.script_pubkey).unwrap_or(false)); + if let Some(our_input) = our_input { + self.transactions.write().unwrap().push(PayjoinTransaction::PendingFirstConfirmation { + original_psbt: original_psbt.clone(), + tx: tx.clone(), + receiver: payjoin_uri.address.clone(), + amount: payjoin_uri.amount.unwrap_or_default(), + }); + let txid = tx.txid(); + self.register_tx(&txid, &our_input.script_pubkey); + self.event_queue.add_event(Event::PayjoinPaymentPending { + txid, + amount: payjoin_uri.amount.unwrap_or_default().to_sat(), + receipient: payjoin_uri.address.clone().into(), + })?; Ok(tx) } else { - Err(Error::PaymentSendingFailed) + self.event_queue.add_event(Event::PayjoinPaymentFailed { + txid: None, + amount: payjoin_uri.amount.unwrap_or_default().to_sat(), + receipient: payjoin_uri.address.clone().into(), + reason: PayjoinPaymentFailureReason::TransactionFinalisationFailed, + })?; + Err(Error::PayjoinReceiverRequestValidationFailed) // fixeror } } - pub(crate) fn timeout_payjoin_transaction( - &self, payjoin_uri: payjoin::Uri<'_, NetworkChecked>, - ) -> Result<(), Error> { - let amount = match payjoin_uri.amount { - Some(amt) => amt.to_sat(), - None => return Err(Error::PayjoinRequestMissingAmount), - }; - let _ = self.event_queue.add_event(Event::PayjoinPaymentFailed { - txid: None, - receipient: payjoin_uri.address.clone().into(), - amount, - reason: PayjoinPaymentFailureReason::Timeout, - }); - Ok(()) - } - fn internal_transactions_confirmed( &self, header: &Header, txdata: &TransactionData, height: u32, ) { let (_, tx) = txdata[0]; let confirmed_tx_txid = tx.txid(); let mut transactions = self.transactions.write().unwrap(); - if let Some(position) = - transactions.iter().position(|o| o.txid() == Some(confirmed_tx_txid)) - { - let tx = transactions.remove(position); - tx.to_pending_threshold_confirmations(height, header.block_hash()); - } else { - log_error!( - self.logger, - "Notified about UNTRACKED confirmed payjoin transaction {}", - confirmed_tx_txid - ); + let position = match transactions.iter().position(|o| o.txid() == Some(confirmed_tx_txid)) { + Some(position) => position, + None => { + return; + }, + }; + let pj_tx = transactions.remove(position); + match pj_tx { + PayjoinTransaction::PendingFirstConfirmation { + ref tx, + receiver, + amount, + original_psbt, + } => { + transactions.push(PayjoinTransaction::PendingThresholdConfirmations { + original_psbt, + tx: tx.clone(), + receiver, + amount, + first_confirmation_height: height, + first_confirmation_hash: header.block_hash(), + }); + }, + _ => { + unreachable!() + }, }; - } - - fn internal_best_block_updated(&self, height: u32) { - let mut transactions = self.transactions.write().unwrap(); - transactions.retain(|tx| { - if let (Some(first_conf), Some(txid)) = (tx.first_confirmation_height(), tx.txid()) { - if height - first_conf >= ANTI_REORG_DELAY { - let _ = self.event_queue.add_event(Event::PayjoinPaymentSuccess { - txid, - amount: tx.amount().to_sat(), - receipient: tx.receiver().into(), - }); - false - } else { - true - } - } else { - true - } - }); } fn internal_get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { @@ -209,69 +234,30 @@ impl PayjoinHandler { }) .collect::>() } -} -#[derive(Clone, Debug)] -enum PayjoinTransaction { - // PendingReceiverResponse, - PendingFirstConfirmation { - tx: Transaction, - receiver: Address, - amount: Amount, - }, - PendingThresholdConfirmations { - tx: Transaction, - receiver: Address, - amount: Amount, - first_confirmation_height: u32, - first_confirmation_hash: BlockHash, - }, -} - -impl PayjoinTransaction { - fn new(tx: Transaction, receiver: Address, amount: Amount) -> Self { - PayjoinTransaction::PendingFirstConfirmation { tx, receiver, amount } - } - fn txid(&self) -> Option { - match self { - PayjoinTransaction::PendingFirstConfirmation { tx, .. } => Some(tx.txid()), - PayjoinTransaction::PendingThresholdConfirmations { tx, .. } => Some(tx.txid()), - } - } - fn first_confirmation_height(&self) -> Option { - match self { - PayjoinTransaction::PendingFirstConfirmation { .. } => None, - PayjoinTransaction::PendingThresholdConfirmations { - first_confirmation_height, .. - } => Some(*first_confirmation_height), - } - } - fn amount(&self) -> Amount { - match self { - PayjoinTransaction::PendingFirstConfirmation { amount, .. } => *amount, - PayjoinTransaction::PendingThresholdConfirmations { amount, .. } => *amount, - } - } - fn receiver(&self) -> Address { - match self { - PayjoinTransaction::PendingFirstConfirmation { receiver, .. } => receiver.clone(), - PayjoinTransaction::PendingThresholdConfirmations { receiver, .. } => receiver.clone(), - } - } - - fn to_pending_threshold_confirmations(&self, height: u32, hash: BlockHash) -> Self { - match self { - PayjoinTransaction::PendingFirstConfirmation { tx, receiver, amount } => { - PayjoinTransaction::PendingThresholdConfirmations { - tx: tx.clone(), - receiver: receiver.clone(), - amount: *amount, - first_confirmation_height: height, - first_confirmation_hash: hash, + fn internal_best_block_updated(&self, height: u32) { + let mut transactions = self.transactions.write().unwrap(); + transactions.retain(|tx| { + if let (Some(first_conf), Some(txid)) = (tx.first_confirmation_height(), tx.txid()) { + if height - first_conf >= ANTI_REORG_DELAY { + let payment_id: [u8; 32] = + tx.original_psbt().unsigned_tx.txid()[..].try_into().unwrap(); + let mut update_details = PaymentDetailsUpdate::new(PaymentId(payment_id)); + update_details.status = Some(PaymentStatus::Succeeded); + let _ = self.payment_store.update(&update_details); + let _ = self.event_queue.add_event(Event::PayjoinPaymentSuccess { + txid, + amount: tx.amount().to_sat(), + receipient: tx.receiver().into(), + }); + false + } else { + true } - }, - _ => unreachable!(), - } + } else { + true + } + }); } } @@ -285,11 +271,13 @@ impl Filter for PayjoinHandler { } } -impl Confirm for PayjoinHandler { +impl lightning::chain::Confirm for PayjoinHandler { fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) { self.internal_transactions_confirmed(header, txdata, height); } + fn transaction_unconfirmed(&self, _txid: &Txid) {} + fn best_block_updated(&self, _header: &Header, height: u32) { self.internal_best_block_updated(height); } @@ -297,23 +285,4 @@ impl Confirm for PayjoinHandler { fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { self.internal_get_relevant_txids() } - - fn transaction_unconfirmed(&self, _txid: &Txid) {} -} - -async fn send_payjoin_ohttp_request(request: &payjoin::Request) -> Result, Error> { - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - reqwest::header::CONTENT_TYPE, - reqwest::header::HeaderValue::from_static("message/ohttp-req"), - ); - let response = reqwest::Client::new() - .post(request.url.clone()) - .body(request.body.clone()) - .timeout(PAYJOIN_REQUEST_TIMEOUT) - .headers(headers) - .send() - .await?; - let response = response.error_for_status()?.bytes().await?; - Ok(response.to_vec()) } diff --git a/src/payment/payjoin/mod.rs b/src/payment/payjoin/mod.rs index d37020c07..3c71f61d1 100644 --- a/src/payment/payjoin/mod.rs +++ b/src/payment/payjoin/mod.rs @@ -1,8 +1,12 @@ //! Holds a payment handler allowing to send Payjoin payments. +use lightning::chain::chaininterface::BroadcasterInterface; +use lightning::ln::channelmanager::PaymentId; +use lightning::log_error; + use crate::config::{PAYJOIN_REQUEST_TOTAL_DURATION, PAYJOIN_RETRY_INTERVAL}; -use crate::logger::{log_error, log_info, FilesystemLogger, Logger}; -use crate::types::{ChannelManager, Wallet}; +use crate::logger::{FilesystemLogger, Logger}; +use crate::types::{Broadcaster, ChannelManager, PaymentStore, Wallet}; use bitcoin::secp256k1::PublicKey; use lightning::ln::msgs::SocketAddress; use lightning::util::config::{ChannelHandshakeConfig, UserConfig}; @@ -19,6 +23,8 @@ pub(crate) mod handler; use handler::PayjoinHandler; +use super::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; + /// A payment handler allowing to send Payjoin payments. /// /// Payjoin transactions can be used to improve privacy by breaking the common-input-ownership @@ -63,34 +69,40 @@ use handler::PayjoinHandler; /// [`BIP77`]: https://github.com/bitcoin/bips/blob/3b863a402e0250658985f08a455a6cd103e269e5/bip-0077.mediawiki pub struct PayjoinPayment { runtime: Arc>>, - handler: Option>, + payjoin_handler: Option>, receiver: Option>, config: Arc, logger: Arc, wallet: Arc, + tx_broadcaster: Arc, peer_store: Arc>>, channel_manager: Arc, connection_manager: Arc>>, + payment_store: Arc, } impl PayjoinPayment { pub(crate) fn new( runtime: Arc>>, - handler: Option>, receiver: Option>, + payjoin_handler: Option>, receiver: Option>, config: Arc, logger: Arc, wallet: Arc, - peer_store: Arc>>, channel_manager: Arc, + tx_broadcaster: Arc, peer_store: Arc>>, + channel_manager: Arc, connection_manager: Arc>>, + payment_store: Arc, ) -> Self { Self { runtime, - handler, + payjoin_handler, receiver, config, logger, wallet, + tx_broadcaster, peer_store, channel_manager, connection_manager, + payment_store, } } @@ -102,8 +114,8 @@ impl PayjoinPayment { /// Due to the asynchronous nature of the Payjoin process, this method will return immediately /// after constucting the Payjoin request and sending it in the background. The result of the /// operation will be communicated through the event queue. If the Payjoin request is - /// successful, [`Event::PayjoinPaymentSuccess`] event will be added to the event queue. - /// Otherwise, [`Event::PayjoinPaymentFailed`] is added. + /// successful, [`Event::PayjoinTxSendSuccess`] event will be added to the event queue. + /// Otherwise, [`Event::PayjoinTxSendFailed`] is added. /// /// The total duration of the Payjoin process is defined in `PAYJOIN_REQUEST_TOTAL_DURATION`. /// If the Payjoin receiver does not respond within this duration, the process is considered @@ -114,45 +126,59 @@ impl PayjoinPayment { /// /// [`BIP21`]: https://github.com/bitcoin/bips/blob/master/bip-0021.mediawiki /// [`BIP77`]: https://github.com/bitcoin/bips/blob/d7ffad81e605e958dcf7c2ae1f4c797a8631f146/bip-0077.mediawiki - /// [`Event::PayjoinPaymentSuccess`]: crate::Event::PayjoinPaymentSuccess - /// [`Event::PayjoinPaymentFailed`]: crate::Event::PayjoinPaymentFailed + /// [`Event::PayjoinTxSendSuccess`]: crate::Event::PayjoinTxSendSuccess + /// [`Event::PayjoinTxSendFailed`]: crate::Event::PayjoinTxSendFailed pub fn send(&self, payjoin_uri: String) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { return Err(Error::NotRunning); } + let payjoin_handler = self.payjoin_handler.as_ref().ok_or(Error::PayjoinUnavailable)?; let payjoin_uri = payjoin::Uri::try_from(payjoin_uri).map_err(|_| Error::PayjoinUriInvalid).and_then( |uri| uri.require_network(self.config.network).map_err(|_| Error::InvalidNetwork), )?; - let mut original_psbt = self.wallet.build_payjoin_transaction(payjoin_uri.clone())?; - let payjoin_handler = self.handler.as_ref().ok_or(Error::PayjoinUnavailable)?; + let original_psbt = self.wallet.build_payjoin_transaction(payjoin_uri.clone())?; let payjoin_handler = Arc::clone(payjoin_handler); + let runtime = rt_lock.as_ref().unwrap(); + let tx_broadcaster = Arc::clone(&self.tx_broadcaster); let logger = Arc::clone(&self.logger); - log_info!(logger, "Sending Payjoin request to: {}", payjoin_uri.address); - rt_lock.as_ref().unwrap().spawn(async move { + let payment_store = Arc::clone(&self.payment_store); + let payment_id = original_psbt.unsigned_tx.txid()[..].try_into().unwrap(); + payment_store.insert(PaymentDetails::new( + PaymentId(payment_id), + PaymentKind::Payjoin, + payjoin_uri.amount.map(|a| a.to_sat()), + PaymentDirection::Outbound, + PaymentStatus::Pending, + ))?; + runtime.spawn(async move { let mut interval = tokio::time::interval(PAYJOIN_RETRY_INTERVAL); loop { tokio::select! { _ = tokio::time::sleep(PAYJOIN_REQUEST_TOTAL_DURATION) => { - log_error!(logger, "Payjoin request timed out."); - let _ = payjoin_handler.timeout_payjoin_transaction(payjoin_uri.clone()); + let _ = payjoin_handler.handle_request_timeout(payjoin_uri.clone(), &original_psbt); break; } _ = interval.tick() => { - match payjoin_handler.send_payjoin_transaction(&mut original_psbt, payjoin_uri.clone()).await { - Ok(Some(_)) => { - log_info!(logger, "Payjoin transaction sent successfully."); - break + let payjoin_uri = payjoin_uri.clone(); + match payjoin_handler.send_request(payjoin_uri.clone(), &mut original_psbt.clone()).await { + Ok(Some(mut proposal)) => { + let _ = payjoin_handler.process_response(&mut proposal, &mut original_psbt.clone(), payjoin_uri).inspect(|tx| { + tx_broadcaster.broadcast_transactions(&[&tx]); + }).inspect_err(|e| { + log_error!(logger, "Failed to process Payjoin response: {}", e); + }); + break; }, Ok(None) => { - log_info!(logger, "No Payjoin response yet."); - continue - }, + continue; + } Err(e) => { - log_error!(logger, "Failed to process Payjoin receiver response: {}.", e); + log_error!(logger, "Failed to send Payjoin request : {}", e); + let _ = payjoin_handler.handle_request_failure(payjoin_uri.clone(), &original_psbt); break; - } + }, } } } @@ -172,8 +198,8 @@ impl PayjoinPayment { /// Due to the asynchronous nature of the Payjoin process, this method will return immediately /// after constucting the Payjoin request and sending it in the background. The result of the /// operation will be communicated through the event queue. If the Payjoin request is - /// successful, [`Event::PayjoinPaymentSuccess`] event will be added to the event queue. - /// Otherwise, [`Event::PayjoinPaymentFailed`] is added. + /// successful, [`Event::PayjoinTxSendSuccess`] event will be added to the event queue. + /// Otherwise, [`Event::PayjoinTxSendFailed`] is added. /// /// The total duration of the Payjoin process is defined in `PAYJOIN_REQUEST_TOTAL_DURATION`. /// If the Payjoin receiver does not respond within this duration, the process is considered @@ -184,17 +210,13 @@ impl PayjoinPayment { /// /// [`BIP21`]: https://github.com/bitcoin/bips/blob/master/bip-0021.mediawiki /// [`BIP77`]: https://github.com/bitcoin/bips/blob/d7ffad81e605e958dcf7c2ae1f4c797a8631f146/bip-0077.mediawiki - /// [`Event::PayjoinPaymentSuccess`]: crate::Event::PayjoinPaymentSuccess - /// [`Event::PayjoinPaymentFailed`]: crate::Event::PayjoinPaymentFailed + /// [`Event::PayjoinTxSendSuccess`]: crate::Event::PayjoinTxSendSuccess + /// [`Event::PayjoinTxSendFailed`]: crate::Event::PayjoinTxSendFailed pub fn send_with_amount(&self, payjoin_uri: String, amount_sats: u64) -> Result<(), Error> { - let payjoin_uri = match payjoin::Uri::try_from(payjoin_uri) { - Ok(uri) => uri, - Err(_) => return Err(Error::PayjoinUriInvalid), - }; - let mut payjoin_uri = match payjoin_uri.require_network(self.config.network) { - Ok(uri) => uri, - Err(_) => return Err(Error::InvalidNetwork), - }; + let mut payjoin_uri = + payjoin::Uri::try_from(payjoin_uri).map_err(|_| Error::PayjoinUriInvalid).and_then( + |uri| uri.require_network(self.config.network).map_err(|_| Error::InvalidNetwork), + )?; payjoin_uri.amount = Some(bitcoin::Amount::from_sat(amount_sats)); self.send(payjoin_uri.to_string()) } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 4492bcfc6..2a3867ebc 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -114,8 +114,10 @@ where { fn broadcast_transactions(&self, txs: &[&Transaction]) { let package = txs.iter().map(|&t| t.clone()).collect::>(); - self.queue_sender.try_send(package).unwrap_or_else(|e| { + let ret = self.queue_sender.try_send(package).unwrap_or_else(|e| { + dbg!(&e); log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); + dbg!(&ret); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 44f7de420..edccad541 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -147,6 +147,23 @@ macro_rules! expect_payment_successful_event { pub(crate) use expect_payment_successful_event; +macro_rules! expect_payjoin_tx_pending_event { + ($node: expr) => {{ + match $node.wait_next_event() { + ref e @ Event::PayjoinPaymentPending { txid, .. } => { + println!("{} got event {:?}", $node.node_id(), e); + $node.event_handled(); + txid + }, + ref e => { + panic!("{} got unexpected event!: {:?}", std::stringify!($node), e); + }, + } + }}; +} + +pub(crate) use expect_payjoin_tx_pending_event; + macro_rules! expect_payjoin_tx_sent_successfully_event { ($node: expr) => {{ match $node.wait_next_event() { diff --git a/tests/integration_tests_payjoin.rs b/tests/integration_tests_payjoin.rs index f482c5382..43ec91c8f 100644 --- a/tests/integration_tests_payjoin.rs +++ b/tests/integration_tests_payjoin.rs @@ -7,7 +7,12 @@ use common::{ }; use bitcoin::Amount; -use ldk_node::Event; +use ldk_node::{ + payment::{PaymentDirection, PaymentKind, PaymentStatus}, + Event, +}; + +use crate::common::expect_payjoin_tx_pending_event; #[test] fn send_receive_regular_payjoin_transaction() { @@ -33,10 +38,22 @@ fn send_receive_regular_payjoin_transaction() { dbg!(&payjoin_uri); let sender_payjoin_payment = node_b_pj_sender.payjoin_payment(); assert!(sender_payjoin_payment.send(payjoin_uri).is_ok()); - let txid = expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); + let txid = expect_payjoin_tx_pending_event!(node_b_pj_sender); + let payments = node_b_pj_sender.list_payments(); + let payment = payments.first().unwrap(); + assert_eq!(payment.amount_msat, Some(80_000)); + assert_eq!(payment.status, PaymentStatus::Pending); + assert_eq!(payment.direction, PaymentDirection::Outbound); + assert_eq!(payment.kind, PaymentKind::Payjoin); wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 3); + node_b_pj_sender.sync_wallets().unwrap(); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 4); node_b_pj_sender.sync_wallets().unwrap(); + let payments = node_b_pj_sender.list_payments(); + let payment = payments.first().unwrap(); + assert_eq!(payment.status, PaymentStatus::Succeeded); + expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); let node_b_balance = node_b_pj_sender.list_balances(); assert!(node_b_balance.total_onchain_balance_sats < premine_amount_sat - 80000); } @@ -65,10 +82,12 @@ fn send_payjoin_with_amount() { dbg!(&payjoin_uri); let sender_payjoin_payment = node_b_pj_sender.payjoin_payment(); assert!(sender_payjoin_payment.send_with_amount(payjoin_uri, 80_000).is_ok()); - let txid = expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + let _txid = expect_payjoin_tx_pending_event!(node_b_pj_sender); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 3); + node_b_pj_sender.sync_wallets().unwrap(); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 4); node_b_pj_sender.sync_wallets().unwrap(); + let _txid = expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); let node_b_balance = node_b_pj_sender.list_balances(); assert!(node_b_balance.total_onchain_balance_sats < premine_amount_sat - 80000); } diff --git a/tests/integration_tests_payjoin_with_channel_opening.rs b/tests/integration_tests_payjoin_with_channel_opening.rs index ad2214b6f..5e1ad4fa4 100644 --- a/tests/integration_tests_payjoin_with_channel_opening.rs +++ b/tests/integration_tests_payjoin_with_channel_opening.rs @@ -10,6 +10,8 @@ use common::{ use bitcoin::Amount; use ldk_node::Event; +use crate::common::expect_payjoin_tx_pending_event; + #[test] fn send_receive_payjoin_transaction_with_channel_opening() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -46,16 +48,18 @@ fn send_receive_payjoin_transaction_with_channel_opening() { assert!(sender_payjoin_payment.send(payjoin_uri).is_ok()); expect_channel_pending_event!(node_a_pj_receiver, node_b_pj_sender.node_id()); expect_channel_pending_event!(node_b_pj_sender, node_a_pj_receiver.node_id()); - let txid = expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); + let txid = expect_payjoin_tx_pending_event!(node_b_pj_sender); wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1); + node_b_pj_sender.sync_wallets().unwrap(); generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); node_a_pj_receiver.sync_wallets().unwrap(); node_b_pj_sender.sync_wallets().unwrap(); let node_b_balance = node_b_pj_sender.list_balances(); assert!(node_b_balance.total_onchain_balance_sats < premine_amount_sat - 80000); - expect_channel_ready_event!(node_a_pj_receiver, node_b_pj_sender.node_id()); expect_channel_ready_event!(node_b_pj_sender, node_a_pj_receiver.node_id()); + let _ = expect_payjoin_tx_sent_successfully_event!(node_b_pj_sender); let channels = node_a_pj_receiver.list_channels(); let channel = channels.get(0).unwrap(); assert_eq!(channel.channel_value_sats, 80_000);