Skip to content

Commit

Permalink
WIP add multiple wallets to a single node
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Dec 13, 2024
1 parent 5792ade commit fbdd0e4
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ required-features = ["rusqlite", "callbacks"]
name = "wallet"
required-features = ["wallet", "trace", "rusqlite", "callbacks"]

[[example]]
name = "multi"
required-features = ["wallet", "trace", "rusqlite", "callbacks"]

[[example]]
name = "events"
required-features = ["wallet", "rusqlite", "events"]
96 changes: 96 additions & 0 deletions examples/multi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr},
};

use bdk_kyoto::{logger::TraceLogger, multi::MultiEventReceiver};
use bdk_wallet::{KeychainKind, Wallet};
use kyoto::{HeaderCheckpoint, Network, NodeBuilder, ScriptBuf};

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
struct WalletId(u8);

const WALLET_ID_ONE: WalletId = WalletId(1);
const WALLET_ID_TWO: WalletId = WalletId(2);

const PRIV_RECV: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)";
const PRIV_CHANGE: &str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/1/*)";
const PUB_RECV: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
const PUB_CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";

const NETWORK: Network = Network::Signet;

const PEER: IpAddr = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
const NUM_PEERS: u8 = 1;

const RECOVERY_HEIGHT: u32 = 170_000;

fn get_scripts_for_wallets(wallets: &[&Wallet]) -> HashSet<ScriptBuf> {
let mut spks = HashSet::new();
for wallet in wallets {
for keychain in [KeychainKind::External, KeychainKind::Internal] {
let last_revealed = wallet
.spk_index()
.last_revealed_index(keychain)
.unwrap_or(0);
let lookahead_index = last_revealed + wallet.spk_index().lookahead();
for index in 0..=lookahead_index {
spks.insert(wallet.peek_address(keychain, index).script_pubkey());
}
}
}
spks
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut wallet_one = Wallet::create(PUB_RECV, PUB_CHANGE)
.network(NETWORK)
.lookahead(30)
.create_wallet_no_persist()?;

let mut wallet_two = Wallet::create(PRIV_RECV, PRIV_CHANGE)
.network(NETWORK)
.lookahead(30)
.create_wallet_no_persist()?;

let scripts = get_scripts_for_wallets(&[&wallet_one, &wallet_two]);

let (node, client) = NodeBuilder::new(NETWORK)
.add_peer(PEER)
.num_required_peers(NUM_PEERS)
.add_scripts(scripts)
.anchor_checkpoint(HeaderCheckpoint::closest_checkpoint_below_height(
RECOVERY_HEIGHT,
NETWORK,
))
.build_node()?;

tokio::task::spawn(async move { node.run().await });

let logger = TraceLogger::new()?;

let (sender, receiver) = client.split();

let mut keychains = Vec::new();
keychains.push((WALLET_ID_ONE, wallet_one.spk_index().clone()));
keychains.push((WALLET_ID_TWO, wallet_two.spk_index().clone()));

let mut event_receiver = MultiEventReceiver::from_keychains(NETWORK, keychains, receiver);
loop {
let updates = event_receiver.updates(&logger).await;
for (id, update) in updates {
if id.eq(&WALLET_ID_ONE) {
wallet_one.apply_update(update)?;
let balance = wallet_one.balance().total().to_sat();
tracing::info!("Wallet one has {balance} satoshis");
} else if id.eq(&WALLET_ID_TWO) {
wallet_two.apply_update(update)?;
let balance = wallet_two.balance().total().to_sat();
tracing::info!("Wallet two has {balance} satoshis");
}
}
sender.shutdown().await?;
return Ok(());
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ pub use kyoto::{NodeState, Receiver, SyncUpdate, TxBroadcast, TxBroadcastPolicy,
pub mod builder;
#[cfg(feature = "callbacks")]
pub mod logger;
#[cfg(feature = "wallet")]
pub mod multi;

#[cfg(feature = "wallet")]
#[derive(Debug)]
Expand Down
153 changes: 153 additions & 0 deletions src/multi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Build an event receiver for multiple [`Wallet`](bdk_wallet).
use core::fmt;
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
};

use bdk_chain::{
bitcoin::{constants::genesis_block, params::Params, FeeRate},
keychain_txout::KeychainTxOutIndex,
local_chain::{self, LocalChain},
spk_client::FullScanResponse,
ConfirmationBlockTime, IndexedTxGraph, TxUpdate,
};
use kyoto::{IndexedBlock, NodeMessage, Receiver, SyncUpdate};

use crate::NodeEventHandler;
use crate::StringExt;

/// Interpret events from a node that is running to apply
/// multiple wallets in parallel.
#[derive(Debug)]
pub struct MultiEventReceiver<H, K> {
// channel receiver
receiver: kyoto::Receiver<NodeMessage>,
// changes to local chain
chain: local_chain::LocalChain,
// receive graphs
map: HashMap<H, IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<K>>>,
// the network minimum to broadcast a transaction
min_broadcast_fee: FeeRate,
}

impl<H, K> MultiEventReceiver<H, K>
where
H: Hash + Eq + Clone + Copy,
K: fmt::Debug + Clone + Ord,
{
/// Build a light client event handler from a [`KeychainTxOutIndex`] and [`CheckPoint`].
pub fn from_keychains(
params: impl AsRef<Params>,
keychains: impl IntoIterator<Item = (H, KeychainTxOutIndex<K>)>,
receiver: Receiver<NodeMessage>,
) -> Self {
let mut map = HashMap::new();
for (index, keychain) in keychains {
map.insert(index, IndexedTxGraph::new(keychain));
}
Self {
receiver,
chain: LocalChain::from_genesis_hash(genesis_block(params).block_hash()).0,
map,
min_broadcast_fee: FeeRate::BROADCAST_MIN,
}
}

/// Return the most recent update from the node once it has synced to the network's tip.
/// This may take a significant portion of time during wallet recoveries or dormant wallets.
/// Note that you may call this method in a loop as long as the node is running.
///
/// A reference to a [`NodeEventHandler`] is required, which handles events emitted from a
/// running node. Production applications should define how the application handles
/// these events and displays them to end users.
#[cfg(feature = "callbacks")]
pub async fn updates(
&mut self,
logger: &dyn NodeEventHandler,
) -> impl Iterator<Item = (H, FullScanResponse<K>)> {
let mut chain_changeset = BTreeMap::new();
while let Ok(message) = self.receiver.recv().await {
self.log(&message, logger);
match message {
NodeMessage::Block(IndexedBlock { height, block }) => {
let hash = block.header.block_hash();
chain_changeset.insert(height, Some(hash));
for keychain in self.map.values_mut() {
let _ = keychain.apply_block_relevant(&block, height);
}
}
NodeMessage::BlocksDisconnected(headers) => {
for header in headers {
let height = header.height;
chain_changeset.insert(height, None);
}
}
NodeMessage::Synced(SyncUpdate {
tip,
recent_history,
}) => {
if chain_changeset.is_empty()
&& self.chain.tip().height() == tip.height
&& self.chain.tip().hash() == tip.hash
{
// return early if we're already synced
return Vec::new().into_iter();
}
recent_history.into_iter().for_each(|(height, header)| {
chain_changeset.insert(height, Some(header.block_hash()));
});
break;
}
NodeMessage::FeeFilter(fee_filter) => {
if self.min_broadcast_fee < fee_filter {
self.min_broadcast_fee = fee_filter;
}
}
_ => (),
}
}
let mut responses = Vec::new();
for (index, graph) in &self.map {
let tx_update = TxUpdate::from(graph.graph().clone());
let last_active_indices = graph.index.last_used_indices();
let update = FullScanResponse {
tx_update,
last_active_indices,
chain_update: Some(self.chain.tip()),
};
responses.push((*index, update));
}
responses.into_iter()
}

#[cfg(feature = "callbacks")]
fn log(&self, message: &NodeMessage, logger: &dyn NodeEventHandler) {
match message {
NodeMessage::Dialog(d) => logger.dialog(d.clone()),
NodeMessage::Warning(w) => logger.warning(w.clone()),
NodeMessage::StateChange(s) => logger.state_changed(*s),
NodeMessage::Block(b) => {
let hash = b.block.header.block_hash();
logger.dialog(format!("Applying Block: {hash}"));
}
NodeMessage::Synced(SyncUpdate {
tip,
recent_history: _,
}) => {
logger.synced(tip.height);
}
NodeMessage::BlocksDisconnected(headers) => {
logger.blocks_disconnected(headers.iter().map(|dc| dc.height).collect());
}
NodeMessage::TxSent(t) => {
logger.tx_sent(*t);
}
NodeMessage::TxBroadcastFailure(r) => {
logger.tx_failed(r.txid, r.reason.map(|reason| reason.into_string()))
}
NodeMessage::ConnectionsMet => logger.connections_met(),
_ => (),
}
}
}

0 comments on commit fbdd0e4

Please sign in to comment.