Skip to content

Commit

Permalink
feat: add MultiUpdateSubscriber to receive multiple updates from a …
Browse files Browse the repository at this point in the history
…single node
  • Loading branch information
rustaceanrob committed Feb 9, 2025
1 parent 2d5f3c5 commit 98e7ece
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 2 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ tracing-subscriber = { version = "0.3" }

[[example]]
name = "example"

[[example]]
name = "multi"
95 changes: 95 additions & 0 deletions examples/multi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::net::IpAddr;
use std::net::Ipv4Addr;

use bdk_kyoto::builder::LightClientBuilder;
use bdk_kyoto::MultiLightClient;
use bdk_wallet::Wallet;

use kyoto::Network;

use bdk_kyoto::multi::MultiSyncRequest;
use bdk_kyoto::ScanType;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
struct WalletId(u8);

const WALLET_ID_ONE: WalletId = WalletId(1);
const WALLET_ID_TWO: WalletId = WalletId(2);

const PRIV_RECV: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)";
const PRIV_CHANGE: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/1/*)";
const PUB_RECV: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
const PUB_CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";

const NETWORK: Network = Network::Signet;

const PEER: IpAddr = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
const NUM_PEERS: u8 = 1;

const RECOVERY_HEIGHT: u32 = 170_000;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber)?;

let mut wallet_one = Wallet::create(PUB_RECV, PUB_CHANGE)
.network(NETWORK)
.lookahead(30)
.create_wallet_no_persist()?;

let mut wallet_two = Wallet::create(PRIV_RECV, PRIV_CHANGE)
.network(NETWORK)
.lookahead(30)
.create_wallet_no_persist()?;

let request_one = MultiSyncRequest {
index: WALLET_ID_ONE,
scan_type: ScanType::New,
wallet: &wallet_one,
};
let request_two = MultiSyncRequest {
index: WALLET_ID_TWO,
scan_type: ScanType::Recovery {
from_height: RECOVERY_HEIGHT,
},
wallet: &wallet_two,
};
let requests = vec![request_one, request_two];

let MultiLightClient {
requester: _,
mut log_subscriber,
mut warning_subscriber,
mut update_subscriber,
node,
} = LightClientBuilder::new()
.connections(NUM_PEERS)
.peers(vec![PEER.into()])
.build_multi(requests)?;

tokio::task::spawn(async move { node.run().await });

loop {
tokio::select! {
updates = update_subscriber.sync() => {
for (index, update) in updates {
tracing::info!("Got update for wallet {}", index.0);
if index == WALLET_ID_ONE {
wallet_one.apply_update(update)?;
tracing::info!("Wallet one balance {}", wallet_one.balance().total())
} else if index == WALLET_ID_TWO {
wallet_two.apply_update(update)?;
tracing::info!("Wallet two balance {}", wallet_two.balance().total())
}
}
},
log = log_subscriber.next_log() => {
tracing::info!("{log}")
}
warn = warning_subscriber.next_warning() => {
tracing::warn!("{warn}")
}
}
}
}
135 changes: 133 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
//! }
//! ```
use std::{collections::BTreeMap, path::PathBuf, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
path::PathBuf,
time::Duration,
};

use bdk_wallet::{chain::IndexedTxGraph, Wallet};
use kyoto::NodeBuilder;
Expand All @@ -55,7 +60,11 @@ pub use kyoto::{
TrustedPeer,
};

use crate::{LightClient, LogSubscriber, ScanType, UpdateSubscriber, WalletExt, WarningSubscriber};
use crate::{
multi::{MultiSyncRequest, MultiUpdateSubscriber},
LightClient, LogSubscriber, MultiLightClient, ScanType, UpdateSubscriber, WalletExt,
WarningSubscriber,
};

const RECOMMENDED_PEERS: u8 = 2;

Expand Down Expand Up @@ -177,10 +186,132 @@ impl LightClientBuilder {
node,
})
}

/// Built a client and node for multiple wallets.
pub fn build_multi<'a, H: Hash + Eq + Clone + Copy>(
self,
wallet_requests: impl IntoIterator<Item = MultiSyncRequest<'a, H>>,
) -> Result<MultiLightClient<H>, MultiWalletBuilderError> {
let wallet_requests: Vec<MultiSyncRequest<'a, H>> = wallet_requests.into_iter().collect();
let network_ref = wallet_requests
.first()
.ok_or(MultiWalletBuilderError::EmptyWalletList)
.map(|request| request.wallet.network())?;
for network in wallet_requests.iter().map(|req| req.wallet.network()) {
if network_ref.ne(&network) {
return Err(MultiWalletBuilderError::NetworkMismatch);
}
}

let mut node_builder = NodeBuilder::new(network_ref);
if let Some(whitelist) = self.peers {
node_builder = node_builder.add_peers(whitelist);
}
if let Some(dir) = self.data_dir {
node_builder = node_builder.add_data_dir(dir);
}
if let Some(duration) = self.timeout {
node_builder = node_builder.set_response_timeout(duration)
}
node_builder =
node_builder.num_required_peers(self.connections.unwrap_or(RECOMMENDED_PEERS));

let mut checkpoints = Vec::new();
for wallet_request in wallet_requests.iter() {
match wallet_request.scan_type {
ScanType::New => (),
ScanType::Sync => {
let cp = wallet_request.wallet.latest_checkpoint();
checkpoints.push(HeaderCheckpoint::new(cp.height(), cp.hash()))
}
ScanType::Recovery { from_height } => {
checkpoints.push(HeaderCheckpoint::closest_checkpoint_below_height(
from_height,
network_ref,
));
}
}
}
if let Some(min) = checkpoints.into_iter().min_by_key(|h| h.height) {
node_builder = node_builder.anchor_checkpoint(min);
}

let mut wallet_map = HashMap::new();
for wallet_request in wallet_requests {
let chain = wallet_request.wallet.local_chain().clone();
let keychain_index = wallet_request.wallet.spk_index().clone();
let indexed_graph = IndexedTxGraph::new(keychain_index);
wallet_map.insert(wallet_request.index, (chain, indexed_graph));
node_builder = node_builder.add_scripts(
wallet_request
.wallet
.peek_revealed_plus_lookahead()
.collect(),
);
}

let (node, kyoto_client) = node_builder.build_node()?;

let kyoto::Client {
requester,
log_rx,
warn_rx,
event_rx,
} = kyoto_client;

let multi_update = MultiUpdateSubscriber {
receiver: event_rx,
wallet_map,
chain_changeset: BTreeMap::new(),
};

let client = MultiLightClient {
requester,
log_subscriber: LogSubscriber::new(log_rx),
warning_subscriber: WarningSubscriber::new(warn_rx),
update_subscriber: multi_update,
node,
};
Ok(client)
}
}

impl Default for LightClientBuilder {
fn default() -> Self {
Self::new()
}
}

/// Errors that may occur when attempting to build a node for a list of wallets.
#[derive(Debug)]
pub enum MultiWalletBuilderError {
/// Two or more wallets do not have the same network configured.
NetworkMismatch,
/// The database encountered an error when attempting to open a connection.
SqlError(SqlInitializationError),
/// The list of wallets provided was empty.
EmptyWalletList,
}

impl std::fmt::Display for MultiWalletBuilderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
MultiWalletBuilderError::SqlError(error) => write!(f, "{error}"),
MultiWalletBuilderError::NetworkMismatch => write!(
f,
"two or more wallets do not have the same network configured."
),
MultiWalletBuilderError::EmptyWalletList => {
write!(f, "no wallets were present in the iterator.")
}
}
}
}

impl std::error::Error for MultiWalletBuilderError {}

impl From<SqlInitializationError> for MultiWalletBuilderError {
fn from(value: SqlInitializationError) -> Self {
MultiWalletBuilderError::SqlError(value)
}
}
18 changes: 18 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
use core::{future::Future, pin::Pin};
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::hash::Hash;

type FutureResult<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;

Expand All @@ -63,7 +64,10 @@ use kyoto::Receiver;
use kyoto::UnboundedReceiver;
use kyoto::{BlockHash, Event, IndexedBlock};

use crate::multi::MultiUpdateSubscriber;

pub mod builder;
pub mod multi;

#[derive(Debug)]
/// A node and associated structs to send and receive events to and from the node.
Expand All @@ -80,6 +84,20 @@ pub struct LightClient {
pub node: NodeDefault,
}

/// A node and associated structs to send and receive events to and from the node.
pub struct MultiLightClient<H: Hash + Eq + Clone + Copy> {
/// Send events to a running node (i.e. broadcast a transaction).
pub requester: Requester,
/// Receive logs from the node as it runs.
pub log_subscriber: LogSubscriber,
/// Receive warnings from the node as it runs.
pub warning_subscriber: WarningSubscriber,
/// Receive wallet updates from a node.
pub update_subscriber: MultiUpdateSubscriber<H>,
/// The underlying node that must be run to fetch blocks from peers.
pub node: NodeDefault,
}

/// Interpret events from a node that is running to apply
/// updates to an underlying wallet.
#[derive(Debug)]
Expand Down
94 changes: 94 additions & 0 deletions src/multi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Types for requesting updates for multiple wallets.
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
};

use bdk_wallet::{
chain::{
keychain_txout::KeychainTxOutIndex, local_chain, local_chain::LocalChain,
ConfirmationBlockTime, IndexedTxGraph, TxUpdate,
},
KeychainKind, Update, Wallet,
};
use kyoto::{BlockHash, Event, SyncUpdate, UnboundedReceiver};

use crate::ScanType;

/// A request when building a node for multiple wallets.
pub struct MultiSyncRequest<'a, H: Hash + Eq + Clone + Copy> {
/// The unique identifier for a wallet. Typically a simple index like an integer.
pub index: H,
/// The scanning policy for this wallet.
pub scan_type: ScanType,
/// The wallet to fetch an update for.
pub wallet: &'a Wallet,
}

/// Construct updates for multiple wallets.
pub struct MultiUpdateSubscriber<H: Hash + Eq + Clone + Copy> {
pub(crate) receiver: UnboundedReceiver<Event>,
pub(crate) wallet_map: HashMap<
H,
(
LocalChain,
IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>>,
),
>,
pub(crate) chain_changeset: BTreeMap<u32, Option<BlockHash>>,
}

impl<H> MultiUpdateSubscriber<H>
where
H: Hash + Eq + Clone + Copy,
{
/// Return updates for all registered wallets for every time the light client
/// syncs to connected peers.
pub async fn sync(&mut self) -> impl Iterator<Item = (H, Update)> {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Block(indexed_block) => {
let hash = indexed_block.block.block_hash();
self.chain_changeset
.insert(indexed_block.height, Some(hash));
for (_, graph) in self.wallet_map.values_mut() {
let _ =
graph.apply_block_relevant(&indexed_block.block, indexed_block.height);
}
}
Event::BlocksDisconnected(disconnected) => {
for header in disconnected {
let height = header.height;
self.chain_changeset.insert(height, None);
}
}
Event::Synced(SyncUpdate {
tip: _,
recent_history,
}) => {
recent_history.into_iter().for_each(|(height, header)| {
self.chain_changeset
.insert(height, Some(header.block_hash()));
});
break;
}
}
}
let mut responses = Vec::new();
for (index, (local_chain, graph)) in &mut self.wallet_map {
let tx_update = TxUpdate::from(graph.graph().clone());
let last_active_indices = graph.index.last_used_indices();
local_chain
.apply_changeset(&local_chain::ChangeSet::from(self.chain_changeset.clone()))
.expect("chain was initialized with genesis");
let update = Update {
tx_update,
last_active_indices,
chain: Some(local_chain.tip()),
};
responses.push((*index, update));
}
self.chain_changeset = BTreeMap::new();
responses.into_iter()
}
}

0 comments on commit 98e7ece

Please sign in to comment.