From 5964414251757df3accaaf196d0af340f05b6816 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Wed, 3 Apr 2024 15:50:17 +0300 Subject: [PATCH] working POC --- Cargo.toml | 35 +++--- src/builder.rs | 6 + src/event.rs | 25 ++++- src/lib.rs | 110 +++++++++++++++++- src/pj_new_crate.rs | 175 +++++++++++++++++++---------- src/pjoin.rs | 87 ++++++++++---- src/tx_broadcaster.rs | 109 +++++++++--------- src/wallet.rs | 133 +++++++++++++++++++++- tests/integration_tests_payjoin.rs | 67 +++++++---- 9 files changed, 564 insertions(+), 183 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eea5ad4eb..ef0029cac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,14 +28,15 @@ panic = 'abort' # Abort on panic default = [] [dependencies] -lightning = { version = "0.0.121", features = ["std"] } -lightning-invoice = { version = "0.29.0" } -lightning-net-tokio = { version = "0.0.121" } -lightning-persister = { version = "0.0.121" } -lightning-background-processor = { version = "0.0.121", features = ["futures"] } -lightning-rapid-gossip-sync = { version = "0.0.121" } -lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-https", "time"] } -lightning-liquidity = { version = "0.1.0-alpha.1", features = ["std"] } +# lightning = { version = "0.0.121", features = ["std"] } +lightning = { path = "../rust-lightning/lightning", features = ["std"] } +# lightning-invoice = { version = "0.29.0" } +# lightning-net-tokio = { version = "0.0.121" } +# lightning-persister = { version = "0.0.121" } +# lightning-background-processor = { version = "0.0.121", features = ["futures"] } +# lightning-rapid-gossip-sync = { version = "0.0.121" } +# lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-https", "time"] } +# lightning-liquidity = { version = "0.1.0-alpha.1", features = ["std"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std"] } #lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } @@ -46,14 +47,13 @@ lightning-liquidity = { version = "0.1.0-alpha.1", features = ["std"] } #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] } #lightning-liquidity = { git = "https://github.com/lightningdevkit/lightning-liquidity", branch="main", features = ["std"] } -#lightning = { path = "../rust-lightning/lightning", features = ["std"] } -#lightning-invoice = { path = "../rust-lightning/lightning-invoice" } -#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } -#lightning-persister = { path = "../rust-lightning/lightning-persister" } -#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] } -#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } -#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] } -#lightning-liquidity = { path = "../lightning-liquidity", features = ["std"] } +lightning-invoice = { path = "../rust-lightning/lightning-invoice" } +lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } +lightning-persister = { path = "../rust-lightning/lightning-persister" } +lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] } +lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } +lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] } +lightning-liquidity = { path = "../lightning-liquidity", features = ["std"] } bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]} @@ -83,7 +83,8 @@ prost = { version = "0.11.6", default-features = false} winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { version = "0.0.121", features = ["std", "_test_utils"] } +lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] } +# lightning = { version = "0.0.121", features = ["std", "_test_utils"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] } electrum-client = { version = "0.15.1", default-features = true } bitcoincore-rpc = { version = "0.17.0", default-features = false } diff --git a/src/builder.rs b/src/builder.rs index fff2e2f62..ee93389ff 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -12,6 +12,7 @@ use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; +use crate::pj_new_crate::ChannelScheduler; use crate::pjoin::LDKPayjoinExecuter; use crate::sweep::OutputSweeper; use crate::tx_broadcaster::TransactionBroadcaster; @@ -558,6 +559,7 @@ fn build_with_store_internal( log_error!(logger, "Failed to set up wallet: {}", e); BuildError::WalletSetupFailed })?; + let channel_scheduler = Arc::new(tokio::sync::Mutex::new(ChannelScheduler::new())); let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { @@ -568,6 +570,7 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), + Arc::clone(&channel_scheduler) )); let fee_estimator = Arc::new(OnchainFeeEstimator::new( tx_sync.client().clone(), @@ -586,6 +589,7 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), + Arc::clone(&channel_scheduler) )); let fee_estimator = Arc::new(OnchainFeeEstimator::new( tx_sync.client().clone(), @@ -952,6 +956,7 @@ fn build_with_store_internal( Arc::clone(&logger), Arc::clone(&peer_manager), Arc::clone(&channel_manager), + Arc::clone(&channel_scheduler) ); let payjoin = Arc::new(LDKPayjoin::new(payjoin_executer)); @@ -975,6 +980,7 @@ fn build_with_store_internal( chain_monitor, output_sweeper, payjoin, + channel_scheduler, peer_manager, keys_manager, network_graph, diff --git a/src/event.rs b/src/event.rs index 61cd7973f..fd4fad334 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,3 +1,4 @@ +use crate::pj_new_crate::{ChannelScheduler, FundingTxParams}; use crate::types::{Sweeper, Wallet}; use crate::{ hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId, @@ -292,7 +293,7 @@ impl Future for EventFuture { } } -pub(crate) struct EventHandler +pub(crate) struct EventHandler where L::Target: Logger, { @@ -302,6 +303,7 @@ where output_sweeper: Arc>, network_graph: Arc, payment_store: Arc>, + channel_scheduler: Arc>, peer_store: Arc>, runtime: Arc>>, logger: L, @@ -317,7 +319,7 @@ where channel_manager: Arc>, output_sweeper: Arc>, network_graph: Arc, payment_store: Arc>, peer_store: Arc>, runtime: Arc>>, - logger: L, config: Arc, + logger: L, config: Arc, channel_scheduler: Arc>, ) -> Self { Self { event_queue, @@ -326,6 +328,7 @@ where output_sweeper, network_graph, payment_store, + channel_scheduler, peer_store, logger, runtime, @@ -340,8 +343,10 @@ where counterparty_node_id, channel_value_satoshis, output_script, + user_channel_id, .. } => { + dbg!("Entered FundingGenerationReady event handler"); // Construct the raw transaction with the output that is paid the amount of the // channel. let confirmation_target = ConfirmationTarget::NonAnchorChannelFee; @@ -350,6 +355,22 @@ where let cur_height = self.channel_manager.current_best_block().height(); let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO); + // payjoin scenario + let mut channel_scheduler = self.channel_scheduler.lock().await; + if channel_scheduler.channel(user_channel_id).is_some() { + dbg!("Entered payjoin channel scheduler scenario"); + let funding_tx_params = FundingTxParams::new( + output_script.clone().into_bytes(), + confirmation_target, + locktime, + temporary_channel_id + ); + channel_scheduler.add_funding_tx_params(user_channel_id, funding_tx_params); + dbg!("payjoin channel scheduler scenario completed"); + return {}; + } + dbg!("Didnt enter payjoin channel scheduler scenario"); + // Sign the final funding transaction and broadcast it. match self.wallet.create_funding_transaction( output_script, diff --git a/src/lib.rs b/src/lib.rs index e68fc2b29..491e9ad85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,6 +109,7 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; +use pj_new_crate::ChannelScheduler; pub use types::{BestBlock, ChannelConfig}; use payjoin::Uri; mod pjoin; @@ -195,6 +196,7 @@ pub struct Node { chain_monitor: Arc>, output_sweeper: Arc>, payjoin: Arc>, + channel_scheduler: Arc>, peer_manager: Arc>, keys_manager: Arc, network_graph: Arc, @@ -667,6 +669,7 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), + Arc::clone(&self.channel_scheduler), )); // Setup background processing @@ -738,18 +741,33 @@ impl Node { /// Request a new channel to be opened with a remote peer. pub async fn schedule_payjoin_channel( - &self, channel_amount_sats: u64, push_msat: Option, announce_channel: bool, + &self, + channel_amount_sats: u64, + push_msat: Option, + announce_channel: bool, node_id: PublicKey, + address: SocketAddress, ) -> Result { + let user_channel_id: u128 = rand::thread_rng().gen::(); let channel = - ScheduledChannel::new(channel_amount_sats, push_msat, announce_channel, node_id); - self.payjoin.schedule(channel).await; + ScheduledChannel::new(channel_amount_sats, push_msat, announce_channel, node_id, user_channel_id, None, None); + self.channel_scheduler.lock().await.schedule(channel); + let announce_channel = true; + self.connect_open_channel_payjoin( + node_id, + address, + channel_amount_sats, + None, + None, + announce_channel, + user_channel_id + )?; // this should be stopped after `ACCEPT_CHANNEL` let bip21 = self.payjoin_bip21(channel_amount_sats); bip21 } /// Generate a BIP21 URI for a payjoin request. - pub fn payjoin_bip21(&self, amount_sats: u64) -> Result { + fn payjoin_bip21(&self, amount_sats: u64) -> Result { let address = self.wallet.get_new_address()?; let amount = Amount::from_sat(amount_sats); let pj = format!("https://0.0.0.0:{}/payjoin", self.config.payjoin_server_port); @@ -760,7 +778,7 @@ impl Node { /// List all scheduled payjoin channels. pub async fn list_scheduled_channels(&self) -> Result, Error> { - Ok(self.payjoin.list_scheduled_channels().await) + Ok(self.channel_scheduler.lock().await.channels.clone()) } /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. @@ -973,6 +991,76 @@ impl Node { Ok(()) } + /// included `user_channel_id` in inputs + pub fn connect_open_channel_payjoin( + &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, + push_to_counterparty_msat: Option, channel_config: Option>, + announce_channel: bool, user_channel_id: u128, + ) -> Result { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { + return Err(Error::NotRunning); + } + let runtime = rt_lock.as_ref().unwrap(); + + // let cur_balance = self.wallet.get_balance()?; + // if cur_balance.get_spendable() < channel_amount_sats { + // log_error!(self.logger, "Unable to create channel due to insufficient funds."); + // return Err(Error::InsufficientFunds); + // } + + let peer_info = PeerInfo { node_id, address }; + + let con_node_id = peer_info.node_id; + let con_addr = peer_info.address.clone(); + let con_logger = Arc::clone(&self.logger); + let con_pm = Arc::clone(&self.peer_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + tokio::task::block_in_place(move || { + runtime.block_on(async move { + connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await + }) + })?; + + let channel_config = (*(channel_config.unwrap_or_default())).clone().into(); + let user_config = UserConfig { + channel_handshake_limits: Default::default(), + channel_handshake_config: ChannelHandshakeConfig { + announced_channel: announce_channel, + ..Default::default() + }, + channel_config, + ..Default::default() + }; + + let push_msat = push_to_counterparty_msat.unwrap_or(0); + + match self.channel_manager.create_channel( + peer_info.node_id, + channel_amount_sats, + push_msat, + user_channel_id, + None, + Some(user_config), + ) { + Ok(_) => { + log_info!( + self.logger, + "Initiated channel creation with peer {}. ", + peer_info.node_id + ); + self.peer_store.add_peer(peer_info)?; + Ok(UserChannelId(user_channel_id)) + }, + Err(e) => { + log_error!(self.logger, "Failed to initiate channel creation: {:?}", e); + Err(Error::ChannelCreationFailed) + }, + } + } + /// Connect to a node and open a new channel. Disconnects and re-connects are handled automatically /// /// Disconnects and reconnects are handled automatically. @@ -1964,3 +2052,15 @@ async fn do_connect_peer( }, } } + + +// 1. user schedule channel +// 1.1 qrcode created to scan +// 2. user scan qrcode +// 2.1 node receives payjoin request +// 2.2 http endpoint loops for x amount of time looking for PayjoinProposal +// 3. node scans if any scheduled channels waiting +// 3.1 node creates the requested channel +// 4. node wait for payjoin channel open requests in FundingGenerationReady state +// 4.1 node creates funding tx with payjoin incoming transaction +// 4.2 save in channel scheduler diff --git a/src/pj_new_crate.rs b/src/pj_new_crate.rs index dfe3210f2..577cc3d72 100644 --- a/src/pj_new_crate.rs +++ b/src/pj_new_crate.rs @@ -9,17 +9,20 @@ /// This module provides `PayjoinScheduler` and a `PayjoinExecuter` trait that can be used to /// implement the payjoin protocol in a Lightning Network node. use bitcoin::secp256k1::PublicKey; -use http_body_util::{BodyExt, Full}; +use bitcoin::{Transaction, Txid}; +use http_body_util::Full; use hyper::body::Incoming; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::Request; use hyper_util::rt::TokioIo; -use rand::Rng; +use lightning::chain::chaininterface::ConfirmationTarget; +use lightning::ln::ChannelId; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::task::JoinError; +use bitcoin::blockdata::locktime::absolute::LockTime; /// `PayjoinExecuter` is a trait that defines an interface for executing payjoin requests in /// Lightning Network environment where it tries to create a channel with a predefined channel @@ -32,36 +35,69 @@ pub trait PayjoinExecuter { /// return a PSBT that is the result of the negotiation with a counterparty node after they /// responded with FundingSigned message. fn request_to_psbt( - &self, channel: ScheduledChannel, request: String, + &self, request: Request, ) -> impl std::future::Future>> + std::marker::Send; } +#[derive(Clone, Debug)] +pub struct FundingTxParams { + pub output_script: Vec, + pub confirmation_target: ConfirmationTarget, + pub locktime: LockTime, + pub temporary_channel_id: ChannelId, +} + +impl FundingTxParams { + pub fn new(output_script: Vec, confirmation_target: ConfirmationTarget, locktime: LockTime, temporary_channel_id: ChannelId) -> Self { + Self { + output_script, + confirmation_target, + locktime, + temporary_channel_id, + } + } +} + /// A scheduled channel is a channel that is scheduled to be created with a counterparty node. The /// channel is opened when a payjoin request is received and the channel is funded with the /// incoming payment. #[derive(Clone, Debug)] pub struct ScheduledChannel { - channel_amount_sats: u64, + pub channel_value_satoshi: u64, push_msat: Option, user_channel_id: u128, announce_channel: bool, - node_id: PublicKey, + pub node_id: PublicKey, + funding_tx_params: Option, + pub funding_tx: Option, + pub is_funding_signed: bool, } impl ScheduledChannel { /// Create a new `ScheduledChannel` with the given channel details. pub fn new( - channel_amount_sats: u64, push_msat: Option, announce_channel: bool, - node_id: PublicKey, + channel_value_satoshi: u64, push_msat: Option, announce_channel: bool, + node_id: PublicKey, user_channel_id: u128, funding_tx_params: Option, funding_tx: Option ) -> Self { - let user_channel_id: u128 = rand::thread_rng().gen::(); - Self { channel_amount_sats, push_msat, user_channel_id, announce_channel, node_id } + Self { channel_value_satoshi, push_msat, user_channel_id, announce_channel, node_id, funding_tx_params, funding_tx, is_funding_signed: false } + } + /// docs + pub fn set_funding_tx(&mut self, funding_tx: Transaction) { + self.funding_tx = Some(funding_tx); + } + /// docs + pub fn set_is_funding_signed(&mut self, is_funding_signed: bool) { + self.is_funding_signed = is_funding_signed; + } + /// docs + pub fn funding_tx_params(&self) -> Option { + self.funding_tx_params.clone() } /// Get the channel amount in satoshis. /// /// The channel amount is the amount that is used to fund the channel when it is created. - pub fn channel_amount_sats(&self) -> u64 { - self.channel_amount_sats + pub fn channel_value_satoshi(&self) -> u64 { + self.channel_value_satoshi } /// Get the push amount in millisatoshis. /// @@ -96,7 +132,6 @@ impl ScheduledChannel { /// `PayjoinExecuter` trait. #[derive(Clone)] pub struct PayjoinScheduler( - Vec, P, ); @@ -105,36 +140,15 @@ where P: PayjoinExecuter + Send + Sync + 'static + Clone, { /// Create a new `PayjoinScheduler` with the given channels and executer. - pub fn new(channels: Vec, executer: P) -> Self { - Self(channels, executer) - } - - /// Schedule a new channel to be created with the counterparty node. - /// - /// The channel is added to the list of scheduled channels and is used to create a channel when - /// a payjoin request is received. - pub fn schedule(&mut self, channel: ScheduledChannel) { - self.0.push(channel); - } - - /// List the scheduled channels. - pub fn list_scheduled_channels(&self) -> Vec { - self.0.clone() - } - - /// Pop the scheduled channel from the list of scheduled channels. - /// - /// The channel is removed from the list of scheduled channels and is used to create a channel - /// when a payjoin request is received. - pub fn pop_scheduled_channel(&mut self) -> Option { - self.0.pop() + pub fn new(executer: P) -> Self { + Self(executer) } /// Execute on the incoming payjoin request. pub async fn request_to_psbt( - &self, channel: ScheduledChannel, request: String, + &self, request: Request, ) -> Result> { - self.1.request_to_psbt(channel, request).await + self.0.request_to_psbt(request).await } /// Serve an incoming payjoin request. @@ -146,9 +160,8 @@ where /// And is accessible from the payjoin_handler function. pub async fn serve(&self, stream: TcpStream) -> Result<(), JoinError> { let io = TokioIo::new(stream); - let channels = self.0.clone(); - let executer = self.1.clone(); - let payjoin_scheduler = Arc::new(Mutex::new(PayjoinScheduler::new(channels, executer))); + let executer = self.0.clone(); + let payjoin_scheduler = Arc::new(Mutex::new(PayjoinScheduler::new(executer))); tokio::task::spawn(async move { if let Err(err) = http1::Builder::new() .serve_connection( @@ -175,57 +188,97 @@ async fn payjoin_handler( }; match (http_request.method(), http_request.uri().path()) { (&hyper::Method::POST, "/payjoin") => { - // integrat payjoin crate here - let _headers = http_request.headers().clone(); - let body = http_request.into_body().collect().await?; - let body = String::from_utf8(body.to_bytes().to_vec()).unwrap(); - let mut scheduler = pj_scheduler.lock().await; - let channel = scheduler.pop_scheduled_channel().unwrap(); - let res = scheduler.request_to_psbt(channel, body).await.unwrap(); + let scheduler = pj_scheduler.lock().await; + let res = scheduler.request_to_psbt(http_request).await.unwrap(); return make_http_response(res); }, _ => make_http_response("404".into()), } } +#[derive(Clone)] +pub struct ChannelScheduler{ + pub channels: Vec +} + +impl ChannelScheduler { + pub(crate) fn new() -> Self { + Self{ channels: vec![] } + } + pub(crate) fn schedule(&mut self, channel: ScheduledChannel) { + self.channels.push(channel); + } + pub(crate) fn channel(&self, user_channel_id: u128) -> Option { + self.channels.iter().find(|channel| channel.user_channel_id == user_channel_id).cloned() + } + pub(crate) fn pop(&self) -> Option { + self.channels.last().cloned() + } + pub(crate) fn add_funding_tx_params(&mut self, user_channel_id: u128, funding_tx_params: FundingTxParams) { + if let Some(channel) = self.channels.iter_mut().find(|channel| channel.user_channel_id == user_channel_id) { + channel.funding_tx_params = Some(funding_tx_params); + } + } + + pub(crate) fn mark_as_funding_signed(&mut self, txid: Txid) -> bool { + if let Some(channel) = self.channels.iter_mut().find(|channel| channel.funding_tx.as_ref().unwrap().txid() == txid) { + dbg!("Marking channel as funding signed {:?}", &channel); + channel.is_funding_signed = true; + true + } else { + false + } + } +} + +impl std::ops::Deref for ChannelScheduler { + type Target = Vec; + fn deref(&self) -> &Self::Target { + &self.channels + } +} + +impl std::ops::DerefMut for ChannelScheduler { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.channels + } +} + + #[cfg(test)] mod tests { use super::*; use bitcoin::secp256k1::{self, Secp256k1}; + use rand::Rng; #[derive(Clone)] struct PayjoinExecuterImpl; impl PayjoinExecuter for PayjoinExecuterImpl { async fn request_to_psbt( - &self, _channel: ScheduledChannel, _request: String, + &self, _request: Request, ) -> Result> { Ok(String::new()) } } #[tokio::test] - async fn test_payjoin_scheduler() { + #[ignore] + async fn test_channel_scheduler() { let create_pubkey = || -> PublicKey { let secp = Secp256k1::new(); PublicKey::from_secret_key(&secp, &secp256k1::SecretKey::from_slice(&[1; 32]).unwrap()) }; - let executer = PayjoinExecuterImpl; - let executer = executer; - let channels = Vec::new(); - let mut scheduler = PayjoinScheduler::new(channels, executer); - let channel_amount_sats = 100; + let channel_value_satoshi = 100; let push_msat = None; let announce_channel = false; let node_id = create_pubkey(); + let user_channel_id: u128 = rand::thread_rng().gen::(); let channel = - ScheduledChannel::new(channel_amount_sats, push_msat, announce_channel, node_id); - scheduler.schedule(channel.clone()); - let channels = scheduler.list_scheduled_channels(); + ScheduledChannel::new(channel_value_satoshi, push_msat, announce_channel, node_id, user_channel_id, None, None); + let mut channel_scheduler = ChannelScheduler::new(); + channel_scheduler.schedule(channel.clone()); + let channels = channel_scheduler.channels; assert_eq!(channels.len(), 1); - let ch = scheduler.pop_scheduled_channel().unwrap(); - assert_eq!(channel.user_channel_id, ch.user_channel_id); - let channels = scheduler.list_scheduled_channels(); - assert_eq!(channels.len(), 0); } } diff --git a/src/pjoin.rs b/src/pjoin.rs index 8bf773004..48e9ac2d6 100644 --- a/src/pjoin.rs +++ b/src/pjoin.rs @@ -2,21 +2,24 @@ /// /// Payjoin is used in the context of channel opening, allowing a node to fund a channel using /// funds from an incoming payjoin request. -use bitcoin::secp256k1::{self, PublicKey, Secp256k1}; +use hyper::{header::HeaderValue, HeaderMap, Request}; use lightning::util::persist::KVStore; use tokio::sync::Mutex; +use hyper::body::Incoming; +use http_body_util::BodyExt; use crate::{ logger::FilesystemLogger, - pj_new_crate::{PayjoinExecuter, PayjoinScheduler, ScheduledChannel}, + pj_new_crate::{ChannelScheduler, PayjoinExecuter, PayjoinScheduler}, types::{ChannelManager, PeerManager, Wallet}, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; pub struct LDKPayjoinExecuter { channel_manager: Arc>, logger: Arc, peer_manager: Arc>, + channel_scheduler: Arc>, wallet: Arc, } @@ -30,6 +33,7 @@ where logger: self.logger.clone(), peer_manager: self.peer_manager.clone(), wallet: self.wallet.clone(), + channel_scheduler: self.channel_scheduler.clone(), } } } @@ -37,18 +41,68 @@ where impl LDKPayjoinExecuter { pub fn new( wallet: Arc, logger: Arc, peer_manager: Arc>, - channel_manager: Arc>, + channel_manager: Arc>, channel_scheduler: Arc>, ) -> Self { - Self { wallet, logger, peer_manager, channel_manager } + Self { wallet, logger, peer_manager, channel_manager, channel_scheduler } + } +} + +struct RequestHeaders(HashMap); + +impl payjoin::receive::Headers for RequestHeaders { + fn get_header(&self, key: &str) -> Option<&str> { self.0.get(key).map(|e| e.as_str()) } +} + +impl From> for RequestHeaders { + fn from(req: HeaderMap) -> Self { + let mut h = HashMap::new(); + for (k, v) in req.iter() { + h.insert(k.to_string(), v.to_str().unwrap().to_string()); + } + RequestHeaders(h) } } impl PayjoinExecuter for LDKPayjoinExecuter { async fn request_to_psbt( - &self, _channel: ScheduledChannel, request: String, + &self, request: Request, ) -> Result> { - // unimplemented!(); - Ok(request) + let headers = request.headers().clone(); + let url = request.uri().path().to_string(); + let body = request.into_body().collect().await?; + let body = String::from_utf8(body.to_bytes().to_vec()).unwrap(); + let unchecked_proposal = payjoin::receive::UncheckedProposal::from_request( + body.as_bytes(), + &url, + RequestHeaders::from(headers) + ).unwrap(); + let mut channel = self.channel_scheduler.lock().await.pop().unwrap(); + let funding_tx_params = channel.clone().funding_tx_params().unwrap(); + let temporary_channel_id = funding_tx_params.temporary_channel_id.clone(); + let counterparty_node_id = channel.clone().node_id; + let payjoin_proposal = match self.wallet.check_incoming_payjoin_request(unchecked_proposal) { + Ok(p) => p, + Err(e) => { + dbg!(&e); + return Err(Box::new(e)); + } + }; + let psbt = payjoin_proposal.psbt(); + let funding_tx = match self.wallet.create_payjoin_funding_tx(funding_tx_params, psbt.clone()) { + Ok(tx) => tx, + Err(e) => { + dbg!(&e); + return Err(Box::new(e)); + } + }; + channel.set_funding_tx(funding_tx.clone()); + let _ = self.channel_manager.funding_transaction_generated( + &temporary_channel_id, + &counterparty_node_id, + funding_tx + ).unwrap(); + + Ok(psbt.to_string()) } } @@ -58,25 +112,10 @@ pub struct LDKPayjoin { impl LDKPayjoin { pub fn new(executer: LDKPayjoinExecuter) -> Self { - // just for testing - let test_pubkey = || -> PublicKey { - let secp = Secp256k1::new(); - PublicKey::from_secret_key(&secp, &secp256k1::SecretKey::from_slice(&[1; 32]).unwrap()) - }; - let test_channels = vec![ScheduledChannel::new(10_000, Some(1_000), true, test_pubkey())]; - // let channels = Vec::new(); - let payjoin_scheduler = PayjoinScheduler::new(test_channels, executer); + let payjoin_scheduler = PayjoinScheduler::new(executer); Self { scheduler: Arc::new(Mutex::new(payjoin_scheduler)) } } - pub async fn schedule(&self, channel: ScheduledChannel) { - self.scheduler.lock().await.schedule(channel); - } - - pub async fn list_scheduled_channels(&self) -> Vec { - self.scheduler.lock().await.list_scheduled_channels() - } - pub async fn serve(&self, stream: tokio::net::TcpStream) -> Result<(), tokio::task::JoinError> { self.scheduler.lock().await.serve(stream).await } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 40483f578..4c65e03d0 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,4 +1,5 @@ use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; +use crate::pj_new_crate::ChannelScheduler; use lightning::chain::chaininterface::BroadcasterInterface; use lightning::util::ser::Writeable; @@ -11,6 +12,7 @@ use tokio::sync::mpsc; use tokio::sync::Mutex; use std::ops::Deref; +use std::sync::Arc; use std::time::Duration; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -23,72 +25,77 @@ where queue_receiver: Mutex>>, esplora_client: EsploraClient, logger: L, + channel_scheduler: Arc>, } impl TransactionBroadcaster where L::Target: Logger, { - pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + pub(crate) fn new(esplora_client: EsploraClient, logger: L, channel_scheduler: Arc>) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger } + Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger, channel_scheduler } } pub(crate) async fn process_queue(&self) { let mut receiver = self.queue_receiver.lock().await; while let Some(next_package) = receiver.recv().await { for tx in &next_package { - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); - }, - Err(e) => match e { - esplora_client::Error::Reqwest(_) => { - // Wait 500 ms and retry in case we get a `Reqwest` error (typically - // 429) - tokio::time::sleep(Duration::from_millis(500)).await; - log_error!( - self.logger, - "Sync failed due to HTTP connection error, retrying: {}", - e - ); - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_debug!( - self.logger, - "Successfully broadcast transaction {}", - tx.txid() - ); - }, - Err(e) => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - } + if self.channel_scheduler.lock().await.mark_as_funding_signed(tx.txid()) { + continue; + } else { + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); }, - _ => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); + Err(e) => match e { + esplora_client::Error::Reqwest(_) => { + // Wait 500 ms and retry in case we get a `Reqwest` error (typically + // 429) + tokio::time::sleep(Duration::from_millis(500)).await; + log_error!( + self.logger, + "Sync failed due to HTTP connection error, retrying: {}", + e + ); + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_debug!( + self.logger, + "Successfully broadcast transaction {}", + tx.txid() + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + } + }, + _ => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, }, - }, + } } } } diff --git a/src/wallet.rs b/src/wallet.rs index aa38eb986..41fb9ad32 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1,7 +1,9 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; +use crate::pj_new_crate::{FundingTxParams, ScheduledChannel}; use crate::Error; +use bitcoin::psbt::Psbt; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; @@ -24,8 +26,10 @@ use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing}; -use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; +use bitcoin::{Amount, OutPoint, ScriptBuf, Transaction, TxOut, Txid}; +use payjoin::receive::{PayjoinProposal, UncheckedProposal}; +use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; @@ -114,6 +118,133 @@ where res } + pub(crate) fn create_payjoin_funding_tx(&self, funding_tx_params: FundingTxParams, mut psbt: Psbt) -> Result { + let _fee_rate = FeeRate::from_sat_per_kwu( + self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee) as f32, + ); + let locked_wallet = self.inner.lock().unwrap(); + psbt.unsigned_tx.lock_time = funding_tx_params.locktime.into(); + let out_put = bitcoin::TxOut { + value: 100000, + script_pubkey: ScriptBuf::from_bytes(funding_tx_params.output_script.clone()), + }; + psbt.unsigned_tx.output.push(out_put); + psbt.unsigned_tx.output.retain(|output| { + !locked_wallet.is_mine(&output.script_pubkey).unwrap() + || ( + locked_wallet.is_mine(&output.script_pubkey).unwrap() && + output.script_pubkey == ScriptBuf::from_bytes(funding_tx_params.output_script.clone()) + ) + }); + dbg!(&psbt); + let mut default_sign_options = SignOptions::default(); + default_sign_options.try_finalize = false; + default_sign_options.trust_witness_utxo = true; + match locked_wallet.sign(&mut psbt, default_sign_options) { + Ok(finalized) => { + if !finalized { + dbg!("Not finalized"); + // return Err(Error::OnchainTxCreationFailed); + } else { + dbg!("(shouldnt happen) finalized"); + } + }, + Err(err) => { + dbg!(&err); + return Err(err.into()); + }, + } + + Ok(psbt.extract_tx()) + } + + pub(crate) fn check_incoming_payjoin_request(&self, proposal: UncheckedProposal) -> Result { + dbg!("entered check_incoming_payjoin_request"); + let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); + dbg!("entered check_incoming_payjoin_request 1"); + let receiver = self.inner.lock().unwrap(); + dbg!("entered check_incoming_payjoin_request 2"); + + // Receive Check 1: Can Broadcast + let proposal = proposal + .check_broadcast_suitability(None, |_tx| { + Ok(true) + }) + .expect("Payjoin proposal should be broadcastable"); + dbg!("entered check_incoming_payjoin_request 3"); + + // Receive Check 2: receiver can't sign for proposal inputs + let proposal = proposal + .check_inputs_not_owned(|input| { + Ok(receiver.is_mine(&input).unwrap()) + }) + .expect("Receiver should not own any of the inputs"); + dbg!("entered check_incoming_payjoin_request 4"); + + // Receive Check 3: receiver can't sign for proposal inputs + let proposal = proposal.check_no_mixed_input_scripts().unwrap(); + dbg!("entered check_incoming_payjoin_request 5"); + + // Receive Check 4: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers. + let payjoin_proposal = proposal + .check_no_inputs_seen_before(|_| Ok(false)) + .unwrap() + .identify_receiver_outputs(|output_script| { + dbg!(&output_script); + let is_mine = receiver.is_mine(&output_script).unwrap(); + dbg!(&is_mine); + Ok(is_mine) + }) + .expect("Receiver should have at least one output"); + dbg!("entered check_incoming_payjoin_request 6"); + + // Select receiver payjoin inputs. TODO Lock them. + // let available_inputs = receiver.list_unspent().unwrap(); + // let candidate_inputs: HashMap = available_inputs + // .iter() + // .map(|i| (Amount::from_sat(i.txout.value), OutPoint { txid: i.outpoint.txid, vout: i.outpoint.vout })) + // .collect(); + + // let selected_outpoint = payjoin.try_preserving_privacy(candidate_inputs).expect("gg"); + // let selected_utxo = available_inputs + // .iter() + // .find(|i| i.outpoint.txid == selected_outpoint.txid && i.outpoint.vout == selected_outpoint.vout) + // .unwrap(); + + // calculate receiver payjoin outputs given receiver payjoin inputs and original_psbt, + // let txo_to_contribute = bitcoin::TxOut { + // value: selected_utxo.txout.value, + // script_pubkey: selected_utxo.txout.script_pubkey.clone() + // }; + // let outpoint_to_contribute = + // bitcoin::OutPoint { txid: selected_utxo.outpoint.txid, vout: selected_utxo.outpoint.vout }; + // payjoin.contribute_witness_input(txo_to_contribute, outpoint_to_contribute); + + // let receiver_substitute_address = self.get_new_address().unwrap(); + // payjoin.substitute_output_address(receiver_substitute_address); + let payjoin_proposal = match payjoin_proposal.finalize_proposal( + |psbt: &Psbt| { + dbg!("entered check_incoming_payjoin_request 6.0"); + if receiver.sign(&mut psbt.clone(), SignOptions::default()).unwrap() { + dbg!("entered check_incoming_payjoin_request 6.1"); + Ok(psbt.clone()) + } else { + dbg!("entered check_incoming_payjoin_request 6.2"); + panic!("Not able to sign our payjoin proposal psbt") + } + }, + Some(bitcoin::FeeRate::MIN), + ) { + Ok(p) => p, + Err(e) => { + dbg!(&e); + panic!("Not able to finalize our payjoin proposal") + }, + }; + dbg!("entered check_incoming_payjoin_request 7"); + Ok(payjoin_proposal) + } + pub(crate) fn create_funding_transaction( &self, output_script: ScriptBuf, value_sats: u64, confirmation_target: ConfirmationTarget, locktime: LockTime, diff --git a/tests/integration_tests_payjoin.rs b/tests/integration_tests_payjoin.rs index b861ce698..509984e87 100644 --- a/tests/integration_tests_payjoin.rs +++ b/tests/integration_tests_payjoin.rs @@ -1,11 +1,13 @@ mod common; -use crate::common::{premine_and_distribute_funds, random_config, setup_node}; +use crate::common::{generate_blocks_and_wait, premine_and_distribute_funds, random_config, setup_node, setup_two_nodes}; use bitcoin::Amount; use bitcoincore_rpc::{Client as BitcoindClient, RpcApi}; use common::setup_bitcoind_and_electrsd; +use lightning::ln::msgs::SocketAddress; mod mock_payjoin_sender { - use bitcoincore_rpc::Client as BitcoindClient; + use bitcoin::base64; +use bitcoincore_rpc::Client as BitcoindClient; use bitcoincore_rpc::RpcApi; use payjoin::bitcoin::address::NetworkChecked; use std::collections::HashMap; @@ -41,7 +43,7 @@ mod mock_payjoin_sender { sender_wallet.wallet_process_psbt(&sender_psbt.psbt, None, None, None).unwrap().psbt; let psbt = Psbt::from_str(&psbt).unwrap(); // Step 4. Construct the request with the PSBT and parameters - let (req, _ctx) = RequestBuilder::from_psbt_and_uri(psbt.clone(), pj_uri) + let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt.clone(), pj_uri) .unwrap() .build_with_additional_fee( bitcoincore_rpc::bitcoin::Amount::from_sat(1), @@ -56,7 +58,6 @@ mod mock_payjoin_sender { // let payjoin_url = pj_uri.extras.e // BITCOIN:BCRT1Q0S724W239Z2XQGSZV6TE96HMYLEDCTX3GDFZEP?amount=0.01&pj=https://localhost:3000 let url_http = req.url.as_str().replace("https", "http"); - dbg!(&url_http); let res = reqwest::blocking::Client::new(); let res = res .post(&url_http) @@ -65,23 +66,24 @@ mod mock_payjoin_sender { .send() .unwrap(); let res = res.text().unwrap(); - (res, String::from_utf8(req.body).unwrap()) // Step 6. Process the response // // An `Ok` response should include a Payjoin Proposal PSBT. // Check that it's signed, following protocol, not trying to steal or otherwise error. - //let psbt = ctx.process_response(&mut res.as_bytes()).unwrap(); + let psbt = ctx.process_response(&mut res.as_bytes()).unwrap(); //// Step 7. Sign and finalize the Payjoin Proposal PSBT //// //// Most software can handle adding the last signatures to a PSBT without issue. - //let psbt = sender_wallet - // .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, None) - // .unwrap() - // .psbt; - //let tx = sender_wallet.finalize_psbt(&psbt, Some(true)).unwrap().hex.unwrap(); + let psbt = sender_wallet + .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, None) + .unwrap() + .psbt; + let tx = sender_wallet.finalize_psbt(&psbt, Some(true)).unwrap().hex.unwrap(); //// Step 8. Broadcast the Payjoin Transaction - //let txid = sender_wallet.send_raw_transaction(&tx).unwrap(); + let txid = sender_wallet.send_raw_transaction(&tx).unwrap(); + dbg!(&txid); //txid + (res, txid.to_string()) } } @@ -89,11 +91,10 @@ mod mock_payjoin_sender { fn payjoin() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let payjoin_sender_wallet: BitcoindClient = bitcoind.create_wallet("payjoin_sender").unwrap(); - let config_a = random_config(); - let node_a = setup_node(&electrsd, config_a); + let (node_a, node_b) = setup_two_nodes(&electrsd, true); let addr_a = node_a.new_onchain_address().unwrap(); let addr_sender = payjoin_sender_wallet.get_new_address(None, None).unwrap().assume_checked(); - let premine_amount_sat = 100_000; + let premine_amount_sat = 100_000_00; premine_and_distribute_funds( &bitcoind.client, &electrsd.client, @@ -107,13 +108,35 @@ fn payjoin() { premine_amount_sat ); assert_eq!(node_a.next_event(), None); - let funding_amount_sat = 80_000; - let pj_uri = node_a.payjoin_bip21(funding_amount_sat).unwrap(); - println!("Payjoin URI: {:?}", pj_uri); - dbg!(&pj_uri); - let (receiver_response, sender_original_psbt) = mock_payjoin_sender::try_payjoin( + assert_eq!(node_a.list_channels().len(), 0); + + let funding_amount_sat = 100_000; + let pj_uri = tokio::runtime::Runtime::new().unwrap().handle().block_on(async { + let pj_uri = node_a.schedule_payjoin_channel( + funding_amount_sat, + None, + false, + node_b.node_id(), + node_b.listening_addresses().unwrap().get(0).unwrap().clone()).await; + pj_uri + }); + // sleep for 1 seconds + std::thread::sleep(std::time::Duration::from_secs(1)); + + let (_, _txid) = mock_payjoin_sender::try_payjoin( &payjoin_sender_wallet, - payjoin::Uri::try_from(pj_uri).unwrap().assume_checked(), + payjoin::Uri::try_from(pj_uri.unwrap()).unwrap().assume_checked(), ); - assert_eq!(receiver_response, sender_original_psbt); + // generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 8); + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + let a = node_a.list_channels(); + let channel = a.get(0).unwrap(); + dbg!(&channel); + // Bug: Txid saved on our side is not the same as the one on the sender side. Do we need a + // different way to track the transaction? + // assert_eq!(txid, channel.funding_txo.unwrap().txid.to_string()); + assert_eq!(channel.channel_value_sats, funding_amount_sat); + assert!(channel.is_channel_ready); + assert!(channel.is_usable); }