Skip to content

Commit

Permalink
fix: double event race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Mar 7, 2025
1 parent 37394d2 commit bb880f2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
2 changes: 1 addition & 1 deletion regtest
75 changes: 50 additions & 25 deletions src/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::grpc::transformers::{transform_invoice_state, transform_route_hints};
use crate::settler::Settler;
use bitcoin::hashes::{sha256, Hash};
use log::{debug, error, warn};
use std::collections::HashMap;
use std::pin::Pin;
use tokio::sync::mpsc;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -217,6 +218,7 @@ where
let params = request.into_inner();
let (tx, rx) = mpsc::channel(16);

let mut initial_state = None;
let mut state_rx = self.settler.state_rx();

match self
Expand All @@ -225,17 +227,26 @@ where
{
Ok(res) => {
if let Some(res) = res {
if let Ok(state) = InvoiceState::try_from(res.invoice.state.as_str()) {
if let Err(err) = tx
.send(Ok(TrackResponse {
state: transform_invoice_state(state),
}))
.await
{
error!("Could not send invoice state update: {}", err);
match InvoiceState::try_from(res.invoice.state.as_str()) {
Ok(state) => {
initial_state = Some(state);
if let Err(err) = tx
.send(Ok(TrackResponse {
state: transform_invoice_state(state),
}))
.await
{
error!("Could not send invoice state update: {}", err);
return Err(Status::new(
Code::Internal,
format!("could not send initial invoice state: {}", err),
));
}
}
Err(err) => {
return Err(Status::new(
Code::Internal,
format!("could not send initial invoice state: {}", err),
format!("could transform invoice state: {}", err),
));
}
}
Expand All @@ -257,6 +268,13 @@ where
continue;
}

// Do not send the initial state twice
if let Some(initial_state) = initial_state {
if initial_state == update.state {
continue;
}
}

if let Err(err) = tx
.send(Ok(TrackResponse {
state: transform_invoice_state(update.state),
Expand Down Expand Up @@ -292,6 +310,7 @@ where

let (tx, rx) = mpsc::channel(128);

let mut initial_states = HashMap::new();
let invoice_helper = self.invoice_helper.clone();
let mut state_rx = self.settler.state_rx();

Expand Down Expand Up @@ -320,26 +339,25 @@ where
}
};

let state = transform_invoice_state(
match InvoiceState::try_from(invoice.invoice.state.as_str()) {
Ok(state) => state,
Err(err) => {
let err = format!(
"Could not parse state of invoice {}: {}",
hex::encode(&hash),
err
);
error!("{}", err);
let _ = tx.send(Err(Status::new(Code::Internal, err))).await;
return;
}
},
);
let state = match InvoiceState::try_from(invoice.invoice.state.as_str()) {
Ok(state) => state,
Err(err) => {
let err = format!(
"Could not parse state of invoice {}: {}",
hex::encode(&hash),
err
);
error!("{}", err);
let _ = tx.send(Err(Status::new(Code::Internal, err))).await;
return;
}
};
initial_states.insert(hash, state);

if let Err(err) = tx
.send(Ok(TrackAllResponse {
state,
bolt11: invoice.invoice.bolt11,
state: transform_invoice_state(state),
payment_hash: invoice.invoice.payment_hash,
}))
.await
Expand All @@ -352,6 +370,13 @@ where
loop {
match state_rx.recv().await {
Ok(update) => {
// Do not send the initial state twice
if let Some(initial_state) = initial_states.get(&update.payment_hash) {
if initial_state == &update.state {
continue;
}
}

if let Err(err) = tx
.send(Ok(TrackAllResponse {
bolt11: update.bolt11,
Expand Down

0 comments on commit bb880f2

Please sign in to comment.