Skip to content

Commit

Permalink
broker: send preapproves to receivesend signers
Browse files Browse the repository at this point in the history
Everytime CLN wants to pay an invoice, or send a keysend, the very first
message it sends is either PreapproveInvoice, or PreapproveKeysend. So
these should go only to signers that have the send feature.
If for some reason the send signer crashes, and broker starts talking to
a receiveonly signer, 1) broker will see that it's receive only, so it
will wait for a send signer to connect back. 2) if broker still sends,
the receive only signer can be programmed to immediately reject any
preapprove messages.
  • Loading branch information
irriden committed Aug 20, 2023
1 parent d6ea514 commit c755601
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 26 deletions.
10 changes: 5 additions & 5 deletions broker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-li
# vls-protocol-client = { path = "../../vls/vls-protocol-client" }
# vls-proxy = { path = "../../vls/vls-proxy" }

lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "f4cd39ee286c98257df251a949a1f963b7856a4f" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "f4cd39ee286c98257df251a949a1f963b7856a4f" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "a4bb2b857e4b79c9ae57a46dfd66c6f67094b47a" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "a4bb2b857e4b79c9ae57a46dfd66c6f67094b47a" }
# lss-connector = { path = "../../sphinx-rs/lss-connector" }
# sphinx-signer = { path = "../../sphinx-rs/signer" }

Expand Down
14 changes: 8 additions & 6 deletions broker/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
pub struct Connections {
pub pubkey: Option<String>,
pub clients: HashMap<String, SignerType>,
pub current: Option<String>,
pub current: Option<(String, SignerType)>,
}

impl Connections {
Expand All @@ -25,17 +25,19 @@ impl Connections {
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
pub fn set_current(&mut self, cid: String) {
self.current = Some(cid);
pub fn set_current(&mut self, cid: String, signer_type: SignerType) {
self.current = Some((cid, signer_type));
}
pub fn add_client(&mut self, cid: &str, signer_type: SignerType) {
self.clients.insert(cid.to_string(), signer_type);
self.current = Some(cid.to_string());
self.current = Some((cid.to_string(), signer_type));
}
pub fn remove_client(&mut self, cid: &str) {
self.clients.remove(cid);
if self.current == Some(cid.to_string()) {
self.current = None;
if let Some((id, _)) = &self.current {
if id == cid {
self.current = None;
}
}
}
}
Expand Down
39 changes: 31 additions & 8 deletions broker/src/looper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use bitcoin::blockdata::constants::ChainHash;
use log::*;
use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics};
use sphinx_signer::{
parser,
sphinx_glyph::{topics, types::SignerType},
};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -123,7 +126,7 @@ impl<C: 'static + Client> SignerLoop<C> {
}
msg => {
let mut catch_init = false;
if let Message::HsmdInit(m) = msg {
if let Message::HsmdInit(ref m) = msg {
catch_init = true;
if let Some(set) = settings {
if ChainHash::using_genesis_block(set.network).as_bytes()
Expand All @@ -135,15 +138,27 @@ impl<C: 'static + Client> SignerLoop<C> {
panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup");
}
}
let reply = self.handle_message(raw_msg, catch_init)?;
let reply = if let Message::PreapproveInvoice(_)
| Message::PreapproveKeysend(_) = msg
{
self.handle_message(raw_msg, catch_init, Some(SignerType::ReceiveSend))?
} else {
// None for signer_type means no restrictions on which signer type to send the message to
self.handle_message(raw_msg, catch_init, None)?
};
// Write the reply to CLN
self.client.write_vec(reply)?;
}
}
}
}

fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
fn handle_message(
&mut self,
message: Vec<u8>,
catch_init: bool,
signer_type: Option<SignerType>,
) -> Result<Vec<u8>> {
// wait until not busy
loop {
match try_to_get_busy() {
Expand All @@ -166,15 +181,15 @@ impl<C: 'static + Client> SignerLoop<C> {
)?;
// send to signer
log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_wait(topics::VLS, md)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, md, signer_type)?;
log::info!("GOT ON {}", res_topic);
let the_res = if res_topic == topics::LSS_RES {
// send reply to LSS to store muts
let lss_reply = self.send_lss(res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.len());
// send to signer for HMAC validation, and get final reply
log::info!("SEND ON {}", topics::LSS_MSG);
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?;
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply, None)?;
log::info!("GOT ON {}, send to CLN", res_topic2);
if res_topic2 != topics::VLS_RES {
log::warn!("got a topic NOT on {}", topics::VLS_RES);
Expand Down Expand Up @@ -213,9 +228,17 @@ impl<C: 'static + Client> SignerLoop<C> {

// returns (topic, payload)
// might halt if signer is offline
fn send_request_wait(&mut self, topic: &str, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
fn send_request_wait(
&mut self,
topic: &str,
message: Vec<u8>,
signer_type: Option<SignerType>,
) -> Result<(String, Vec<u8>)> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(topic, message);
let (request, reply_rx) = match signer_type {
Some(st) => ChannelRequest::new_for_type(st, topic, message),
None => ChannelRequest::new(topic, message),
};
// This can fail if MQTT shuts down
self.chan
.sender
Expand Down
6 changes: 5 additions & 1 deletion broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ pub async fn broker_setup(
false
});
if dance_complete {
log::info!("adding client to the list: {}, type: {:?}", &cid, signer_type);
log::info!(
"adding client to the list: {}, type: {:?}",
&cid,
signer_type
);
let mut cs = conns_.lock().unwrap();
cs.add_client(&cid, signer_type);
drop(cs);
Expand Down
23 changes: 19 additions & 4 deletions broker/src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ pub fn start_broker(
// This is the ReceiveSend signer type
None => SignerType::default(),
};
log::debug!("caught hello message for id: {}, type: {:?}", cid, signer_type);
log::debug!(
"caught hello message for id: {}, type: {:?}",
cid,
signer_type
);
let _ = internal_status_tx.send((true, cid, Some(signer_type)));
} else if topic.ends_with(topics::BYE) {
let _ = internal_status_tx.send((false, cid, None));
Expand Down Expand Up @@ -187,15 +191,25 @@ fn pub_and_wait(
} else {
let current = current.unwrap();
// Try the current connection
let mut rep = pub_timeout(&current, &msg.topic, &msg.message, &msg_rx, link_tx);
// This returns None if 1) signer_type is set, and not equal to the current signer
// 2) If pub_timeout times out
let mut rep = if current.1 == msg.signer_type.unwrap_or(current.1) {
pub_timeout(&current.0, &msg.topic, &msg.message, &msg_rx, link_tx)
} else {
None
};

// If that failed, try looking for some other signer
if rep.is_none() {
for cid in client_list.into_keys().filter(|k| k != &current) {
// If signer_type is set, we also filter for only these types
for (cid, signer_type) in client_list.into_iter().filter(|(k, v)| {
k != &current.0 && v == msg.signer_type.as_ref().unwrap_or(v)
}) {
rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
if rep.is_some() {
let mut cs = conns_.lock().unwrap();
log::debug!("got the list lock!");
cs.set_current(cid.to_string());
cs.set_current(cid.to_string(), signer_type);
drop(cs);
break;
}
Expand All @@ -212,6 +226,7 @@ fn pub_and_wait(
break;
} else {
log::debug!("couldn't reach any clients...");
std::thread::sleep(Duration::from_secs(1));
}
if let Some(max) = retries {
log::debug!("counter: {}, retries: {}", counter, max);
Expand Down

0 comments on commit c755601

Please sign in to comment.