From 98e7eceb3398263afa9c63ef1d4dd3ac7d4b2469 Mon Sep 17 00:00:00 2001 From: Rob N Date: Fri, 24 Jan 2025 16:50:56 -1000 Subject: [PATCH] feat: add `MultiUpdateSubscriber` to receive multiple updates from a single node --- Cargo.toml | 3 ++ examples/multi.rs | 95 ++++++++++++++++++++++++++++++++ src/builder.rs | 135 +++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 18 +++++++ src/multi.rs | 94 ++++++++++++++++++++++++++++++++ 5 files changed, 343 insertions(+), 2 deletions(-) create mode 100644 examples/multi.rs create mode 100644 src/multi.rs diff --git a/Cargo.toml b/Cargo.toml index 8bdea83..9c1c7bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,6 @@ tracing-subscriber = { version = "0.3" } [[example]] name = "example" + +[[example]] +name = "multi" diff --git a/examples/multi.rs b/examples/multi.rs new file mode 100644 index 0000000..2a8a156 --- /dev/null +++ b/examples/multi.rs @@ -0,0 +1,95 @@ +use std::net::IpAddr; +use std::net::Ipv4Addr; + +use bdk_kyoto::builder::LightClientBuilder; +use bdk_kyoto::MultiLightClient; +use bdk_wallet::Wallet; + +use kyoto::Network; + +use bdk_kyoto::multi::MultiSyncRequest; +use bdk_kyoto::ScanType; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +struct WalletId(u8); + +const WALLET_ID_ONE: WalletId = WalletId(1); +const WALLET_ID_TWO: WalletId = WalletId(2); + +const PRIV_RECV: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)"; +const PRIV_CHANGE: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/1/*)"; +const PUB_RECV: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; +const PUB_CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; + +const NETWORK: Network = Network::Signet; + +const PEER: IpAddr = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100)); +const NUM_PEERS: u8 = 1; + +const RECOVERY_HEIGHT: u32 = 170_000; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber)?; + + let mut wallet_one = Wallet::create(PUB_RECV, PUB_CHANGE) + .network(NETWORK) + .lookahead(30) + .create_wallet_no_persist()?; + + let mut wallet_two = Wallet::create(PRIV_RECV, PRIV_CHANGE) + .network(NETWORK) + .lookahead(30) + .create_wallet_no_persist()?; + + let request_one = MultiSyncRequest { + index: WALLET_ID_ONE, + scan_type: ScanType::New, + wallet: &wallet_one, + }; + let request_two = MultiSyncRequest { + index: WALLET_ID_TWO, + scan_type: ScanType::Recovery { + from_height: RECOVERY_HEIGHT, + }, + wallet: &wallet_two, + }; + let requests = vec![request_one, request_two]; + + let MultiLightClient { + requester: _, + mut log_subscriber, + mut warning_subscriber, + mut update_subscriber, + node, + } = LightClientBuilder::new() + .connections(NUM_PEERS) + .peers(vec![PEER.into()]) + .build_multi(requests)?; + + tokio::task::spawn(async move { node.run().await }); + + loop { + tokio::select! { + updates = update_subscriber.sync() => { + for (index, update) in updates { + tracing::info!("Got update for wallet {}", index.0); + if index == WALLET_ID_ONE { + wallet_one.apply_update(update)?; + tracing::info!("Wallet one balance {}", wallet_one.balance().total()) + } else if index == WALLET_ID_TWO { + wallet_two.apply_update(update)?; + tracing::info!("Wallet two balance {}", wallet_two.balance().total()) + } + } + }, + log = log_subscriber.next_log() => { + tracing::info!("{log}") + } + warn = warning_subscriber.next_warning() => { + tracing::warn!("{warn}") + } + } + } +} diff --git a/src/builder.rs b/src/builder.rs index 38ae976..974a46f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -46,7 +46,12 @@ //! } //! ``` -use std::{collections::BTreeMap, path::PathBuf, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, + path::PathBuf, + time::Duration, +}; use bdk_wallet::{chain::IndexedTxGraph, Wallet}; use kyoto::NodeBuilder; @@ -55,7 +60,11 @@ pub use kyoto::{ TrustedPeer, }; -use crate::{LightClient, LogSubscriber, ScanType, UpdateSubscriber, WalletExt, WarningSubscriber}; +use crate::{ + multi::{MultiSyncRequest, MultiUpdateSubscriber}, + LightClient, LogSubscriber, MultiLightClient, ScanType, UpdateSubscriber, WalletExt, + WarningSubscriber, +}; const RECOMMENDED_PEERS: u8 = 2; @@ -177,6 +186,94 @@ impl LightClientBuilder { node, }) } + + /// Built a client and node for multiple wallets. + pub fn build_multi<'a, H: Hash + Eq + Clone + Copy>( + self, + wallet_requests: impl IntoIterator>, + ) -> Result, MultiWalletBuilderError> { + let wallet_requests: Vec> = wallet_requests.into_iter().collect(); + let network_ref = wallet_requests + .first() + .ok_or(MultiWalletBuilderError::EmptyWalletList) + .map(|request| request.wallet.network())?; + for network in wallet_requests.iter().map(|req| req.wallet.network()) { + if network_ref.ne(&network) { + return Err(MultiWalletBuilderError::NetworkMismatch); + } + } + + let mut node_builder = NodeBuilder::new(network_ref); + if let Some(whitelist) = self.peers { + node_builder = node_builder.add_peers(whitelist); + } + if let Some(dir) = self.data_dir { + node_builder = node_builder.add_data_dir(dir); + } + if let Some(duration) = self.timeout { + node_builder = node_builder.set_response_timeout(duration) + } + node_builder = + node_builder.num_required_peers(self.connections.unwrap_or(RECOMMENDED_PEERS)); + + let mut checkpoints = Vec::new(); + for wallet_request in wallet_requests.iter() { + match wallet_request.scan_type { + ScanType::New => (), + ScanType::Sync => { + let cp = wallet_request.wallet.latest_checkpoint(); + checkpoints.push(HeaderCheckpoint::new(cp.height(), cp.hash())) + } + ScanType::Recovery { from_height } => { + checkpoints.push(HeaderCheckpoint::closest_checkpoint_below_height( + from_height, + network_ref, + )); + } + } + } + if let Some(min) = checkpoints.into_iter().min_by_key(|h| h.height) { + node_builder = node_builder.anchor_checkpoint(min); + } + + let mut wallet_map = HashMap::new(); + for wallet_request in wallet_requests { + let chain = wallet_request.wallet.local_chain().clone(); + let keychain_index = wallet_request.wallet.spk_index().clone(); + let indexed_graph = IndexedTxGraph::new(keychain_index); + wallet_map.insert(wallet_request.index, (chain, indexed_graph)); + node_builder = node_builder.add_scripts( + wallet_request + .wallet + .peek_revealed_plus_lookahead() + .collect(), + ); + } + + let (node, kyoto_client) = node_builder.build_node()?; + + let kyoto::Client { + requester, + log_rx, + warn_rx, + event_rx, + } = kyoto_client; + + let multi_update = MultiUpdateSubscriber { + receiver: event_rx, + wallet_map, + chain_changeset: BTreeMap::new(), + }; + + let client = MultiLightClient { + requester, + log_subscriber: LogSubscriber::new(log_rx), + warning_subscriber: WarningSubscriber::new(warn_rx), + update_subscriber: multi_update, + node, + }; + Ok(client) + } } impl Default for LightClientBuilder { @@ -184,3 +281,37 @@ impl Default for LightClientBuilder { Self::new() } } + +/// Errors that may occur when attempting to build a node for a list of wallets. +#[derive(Debug)] +pub enum MultiWalletBuilderError { + /// Two or more wallets do not have the same network configured. + NetworkMismatch, + /// The database encountered an error when attempting to open a connection. + SqlError(SqlInitializationError), + /// The list of wallets provided was empty. + EmptyWalletList, +} + +impl std::fmt::Display for MultiWalletBuilderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + MultiWalletBuilderError::SqlError(error) => write!(f, "{error}"), + MultiWalletBuilderError::NetworkMismatch => write!( + f, + "two or more wallets do not have the same network configured." + ), + MultiWalletBuilderError::EmptyWalletList => { + write!(f, "no wallets were present in the iterator.") + } + } + } +} + +impl std::error::Error for MultiWalletBuilderError {} + +impl From for MultiWalletBuilderError { + fn from(value: SqlInitializationError) -> Self { + MultiWalletBuilderError::SqlError(value) + } +} diff --git a/src/lib.rs b/src/lib.rs index de7f881..302b627 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ use core::{future::Future, pin::Pin}; use std::collections::BTreeMap; use std::collections::HashSet; +use std::hash::Hash; type FutureResult<'a, T, E> = Pin> + Send + 'a>>; @@ -63,7 +64,10 @@ use kyoto::Receiver; use kyoto::UnboundedReceiver; use kyoto::{BlockHash, Event, IndexedBlock}; +use crate::multi::MultiUpdateSubscriber; + pub mod builder; +pub mod multi; #[derive(Debug)] /// A node and associated structs to send and receive events to and from the node. @@ -80,6 +84,20 @@ pub struct LightClient { pub node: NodeDefault, } +/// A node and associated structs to send and receive events to and from the node. +pub struct MultiLightClient { + /// Send events to a running node (i.e. broadcast a transaction). + pub requester: Requester, + /// Receive logs from the node as it runs. + pub log_subscriber: LogSubscriber, + /// Receive warnings from the node as it runs. + pub warning_subscriber: WarningSubscriber, + /// Receive wallet updates from a node. + pub update_subscriber: MultiUpdateSubscriber, + /// The underlying node that must be run to fetch blocks from peers. + pub node: NodeDefault, +} + /// Interpret events from a node that is running to apply /// updates to an underlying wallet. #[derive(Debug)] diff --git a/src/multi.rs b/src/multi.rs new file mode 100644 index 0000000..ea14654 --- /dev/null +++ b/src/multi.rs @@ -0,0 +1,94 @@ +//! Types for requesting updates for multiple wallets. +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; + +use bdk_wallet::{ + chain::{ + keychain_txout::KeychainTxOutIndex, local_chain, local_chain::LocalChain, + ConfirmationBlockTime, IndexedTxGraph, TxUpdate, + }, + KeychainKind, Update, Wallet, +}; +use kyoto::{BlockHash, Event, SyncUpdate, UnboundedReceiver}; + +use crate::ScanType; + +/// A request when building a node for multiple wallets. +pub struct MultiSyncRequest<'a, H: Hash + Eq + Clone + Copy> { + /// The unique identifier for a wallet. Typically a simple index like an integer. + pub index: H, + /// The scanning policy for this wallet. + pub scan_type: ScanType, + /// The wallet to fetch an update for. + pub wallet: &'a Wallet, +} + +/// Construct updates for multiple wallets. +pub struct MultiUpdateSubscriber { + pub(crate) receiver: UnboundedReceiver, + pub(crate) wallet_map: HashMap< + H, + ( + LocalChain, + IndexedTxGraph>, + ), + >, + pub(crate) chain_changeset: BTreeMap>, +} + +impl MultiUpdateSubscriber +where + H: Hash + Eq + Clone + Copy, +{ + /// Return updates for all registered wallets for every time the light client + /// syncs to connected peers. + pub async fn sync(&mut self) -> impl Iterator { + while let Some(event) = self.receiver.recv().await { + match event { + Event::Block(indexed_block) => { + let hash = indexed_block.block.block_hash(); + self.chain_changeset + .insert(indexed_block.height, Some(hash)); + for (_, graph) in self.wallet_map.values_mut() { + let _ = + graph.apply_block_relevant(&indexed_block.block, indexed_block.height); + } + } + Event::BlocksDisconnected(disconnected) => { + for header in disconnected { + let height = header.height; + self.chain_changeset.insert(height, None); + } + } + Event::Synced(SyncUpdate { + tip: _, + recent_history, + }) => { + recent_history.into_iter().for_each(|(height, header)| { + self.chain_changeset + .insert(height, Some(header.block_hash())); + }); + break; + } + } + } + let mut responses = Vec::new(); + for (index, (local_chain, graph)) in &mut self.wallet_map { + let tx_update = TxUpdate::from(graph.graph().clone()); + let last_active_indices = graph.index.last_used_indices(); + local_chain + .apply_changeset(&local_chain::ChangeSet::from(self.chain_changeset.clone())) + .expect("chain was initialized with genesis"); + let update = Update { + tx_update, + last_active_indices, + chain: Some(local_chain.tip()), + }; + responses.push((*index, update)); + } + self.chain_changeset = BTreeMap::new(); + responses.into_iter() + } +}