Skip to content

Commit

Permalink
Improves reorg logic by checking whether we are on sync with the back…
Browse files Browse the repository at this point in the history
…end or not

The current reorg logic does not really take into account whether we are on sync with
the backend or not. On a first block connected after a reorg, it will try to send all
reorged out data assuming it has knowledge of everything that is already confirmed or in
the mempool. However, in a multi block reorg the backend could be at a height that the
tower has not processed yet, hence some transactions may be on the chain but not on our internal
`txindex`. Therefore, it could be the case that we try to re-send something that has been reorged-out
and see it bounce. Under normal conditions, that would mean the transaction was confirmed a long time ago,
since otherwise it would be in our index. However, in this case it may be that it is just confirmed in a
subsequent block we haven't processed yet. This will lead to wrongly assuming the tracker was `IRREVOCABLY
RESOLVED`, while in reality it may only have a few confirmations.

This patch fixes that. In the case of a transaction bouncing we will check whether we are on sync with the backend,
and only if so consider the tracker as `IRREVOCABLY RESOLVED`. Otherwise, the tracker will be flagged as `OffSync`
and retried until it bounces when we are on sync, or its status is updated on block processing.

For context, this edge case was introduced when adding support for prune mode. Before that (when `txindex` for the backend
was required) we would have used `getrawtransaction` to check the confirmation count of the bouncing transaction.
  • Loading branch information
sr-gi committed Aug 3, 2023
1 parent bacac07 commit fa23e50
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 28 deletions.
64 changes: 52 additions & 12 deletions teos/src/carrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,23 @@ impl Carrier {
ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR)
}
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => {
log::info!(
"Transaction was confirmed long ago, not keeping track of it: {}",
tx.txid()
);

// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], meaning that if the transaction bounces it was confirmed long
// ago (> IRREVOCABLY_RESOLVED), so we don't need to worry about it.
ConfirmationStatus::IrrevocablyResolved
if self.bitcoin_cli.get_block_count().unwrap() as u32 > self.block_height {
// We are out of sync, either we are trying to send things to bitcoind after a reorg (and we have not reached the new tip yet)
// or a block was found right when we were trying to send something and we have not yet processed it.
// In both cases we don't know if what we are trying to send is really old (IRREVOCABLY_RESOLVED) or if it'll just be processed
// in our way up to the new tip (we need to make this work both for txindex and prune mode, so getrawtransaction is not an option)
ConfirmationStatus::OffSync
} else {
log::info!(
"Transaction was confirmed long ago, not keeping track of it: {}",
tx.txid()
);

// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], and we know we are on sync, so he transaction
// must have confirmed a long ago (> IRREVOCABLY_RESOLVED). Therefore, we don't need to worry about it anymore.
ConfirmationStatus::IrrevocablyResolved
}
}
rpc_errors::RPC_DESERIALIZATION_ERROR => {
// Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder,
Expand Down Expand Up @@ -311,12 +319,44 @@ mod tests {

#[test]
fn test_send_transaction_verify_already_in_chain() {
let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64,
));
let start_height = START_HEIGHT as u32;
// Set the backend to be one block ahead of us
let bitcoind_mock = BitcoindMock::new(
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
.at_height(start_height + 1),
);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let r = carrier.send_transaction(&tx);

// We are offsync, so the transaction should bounce but tell us about it
assert_eq!(r, ConfirmationStatus::OffSync);
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);

// Try again, but this time being onsync, now we should get an IrrevocablyResolved
// We first need to clear the issued_receipts
carrier.issued_receipts.remove(&tx.txid());
// And either increase our height
carrier.block_height += 1;

let r = carrier.send_transaction(&tx);
assert_eq!(r, ConfirmationStatus::IrrevocablyResolved);
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);
}

#[test]
fn test_send_transaction_verify_already_in_chain_offsync() {
let start_height = START_HEIGHT as u32;
let bitcoind_mock = BitcoindMock::new(
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
.at_height(start_height),
);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
Expand Down
44 changes: 33 additions & 11 deletions teos/src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub enum ConfirmationStatus {
ConfirmedIn(u32),
InMempoolSince(u32),
IrrevocablyResolved,
OffSync,
Rejected(i32),
ReorgedOut,
}
Expand Down Expand Up @@ -70,6 +71,16 @@ impl ConfirmationStatus {
ConfirmationStatus::ConfirmedIn(_) | &ConfirmationStatus::InMempoolSince(_)
)
}

/// Whether the transaction was rejected
pub fn rejected(&self) -> bool {
matches!(self, ConfirmationStatus::Rejected(_))
}

/// Whether the transaction couldn't be processed because we are off sync.
pub fn off_sync(&self) -> bool {
matches!(self, ConfirmationStatus::OffSync)
}
}

/// Minimal data required in memory to keep track of transaction trackers.
Expand Down Expand Up @@ -222,7 +233,7 @@ impl Responder {
carrier.send_transaction(&breach.penalty_tx)
};

if status.accepted() {
if status.accepted() || status.off_sync() {
self.add_tracker(uuid, breach, user_id, status);
}

Expand Down Expand Up @@ -348,17 +359,19 @@ impl Responder {
) -> HashMap<UUID, (Transaction, Option<Transaction>)> {
let dbm = self.dbm.lock().unwrap();
let mut tx_to_rebroadcast = HashMap::new();
let mut tracker: TransactionTracker;

for (uuid, t) in self.trackers.lock().unwrap().iter() {
if let ConfirmationStatus::InMempoolSince(h) = t.status {
if (height - h) as u8 >= CONFIRMATIONS_BEFORE_RETRY {
tracker = dbm.load_tracker(*uuid).unwrap();
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, None));
tx_to_rebroadcast
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
}
} else if let ConfirmationStatus::ReorgedOut = t.status {
tracker = dbm.load_tracker(*uuid).unwrap();
let tracker = dbm.load_tracker(*uuid).unwrap();
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, Some(tracker.dispute_tx)));
} else if t.status.off_sync() {
tx_to_rebroadcast
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
}
}

Expand Down Expand Up @@ -413,20 +426,26 @@ impl Responder {
if tx_index.contains_key(&dispute_tx.txid())
| carrier.in_mempool(&dispute_tx.txid())
{
// Dispute tx is on chain (or mempool), so we only need to care about the penalty
// Dispute tx is on chain (or mempool), so we only need to care about the penalty.
// We know for a fact that the penalty is not in the index, because otherwise it would have been received
// a confirmation during the processing of the current block (hence it would not have been passed to this method)
carrier.send_transaction(&penalty_tx)
} else {
// Dispute tx has also been reorged out, meaning that both transactions need to be broadcast.
// DISCUSS: For lightning transactions, if the dispute has been reorged the penalty cannot make it to the network.
// If we keep this general, the dispute can simply be a trigger and the penalty doesn't necessarily have to spend from it.
// We'll keel it lightning specific, at least for now.
// We'll keep it lightning specific, at least for now.
let status = carrier.send_transaction(&dispute_tx);
if let ConfirmationStatus::Rejected(e) = status {
log::error!(
"Reorged dispute transaction rejected during rebroadcast: {} (reason: {e})",
dispute_tx.txid()
);
status
} else if status.off_sync() {
// If the dispute bounces because we are off-sync, we want to try again with the whole package. Hence, we leave this as
// reorged.
ConfirmationStatus::ReorgedOut
} else {
// The dispute was accepted, so we can rebroadcast the penalty.
carrier.send_transaction(&penalty_tx)
Expand All @@ -435,7 +454,7 @@ impl Responder {
} else {
// The tracker has simply reached CONFIRMATIONS_BEFORE_RETRY missed confirmations.
log::warn!(
"Penalty transaction has missed many confirmations: {}",
"Penalty transaction has missed many confirmations or was sent while we were off-sync with the backend: {}",
penalty_tx.txid()
);
carrier.send_transaction(&penalty_tx)
Expand All @@ -444,11 +463,14 @@ impl Responder {
if let ConfirmationStatus::Rejected(_) = status {
rejected.insert(uuid);
} else {
// Update the tracker if it gets accepted. This will also update the height (since when we are counting the tracker
// to have been in mempool), so it resets the wait period instead of trying to rebroadcast every block.
// Update the status if the tracker is not rejected. This will update the height for InMempooolSince, resetting the
// missed confirmation counter, or flag it as OffSync if we happen to not be on sync.
// DISCUSS: We may want to find another approach in the future for the InMempoool transactions.
trackers.get_mut(&uuid).unwrap().status = status;
accepted.insert(uuid, status);

if status.accepted() {
accepted.insert(uuid, status);
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions teos/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,30 +538,39 @@ pub(crate) struct BitcoindMock {

#[derive(Default)]
pub(crate) struct MockOptions {
height: u32,
error_code: Option<i64>,
in_mempool: bool,
}

impl MockOptions {
pub fn with_error(error_code: i64) -> Self {
Self {
height: 0,
error_code: Some(error_code),
in_mempool: false,
}
}

pub fn in_mempool() -> Self {
Self {
height: 0,
error_code: None,
in_mempool: true,
}
}

pub fn at_height(self, h: u32) -> Self {
Self { height: h, ..self }
}
}

impl BitcoindMock {
pub fn new(options: MockOptions) -> Self {
let mut io = IoHandler::default();

BitcoindMock::add_getblockcount(&mut io, options.height);

if let Some(error) = options.error_code {
io.add_sync_method("error", move |_params: Params| {
Err(JsonRpcError::new(JsonRpcErrorCode::ServerError(error)))
Expand Down Expand Up @@ -614,6 +623,12 @@ impl BitcoindMock {
})
}

fn add_getblockcount(io: &mut IoHandler, h: u32) {
io.add_sync_method("getblockcount", move |_params: Params| {
Ok(Value::Number(h.into()))
});
}

pub fn url(&self) -> &str {
&self.url
}
Expand Down
14 changes: 9 additions & 5 deletions teos/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,15 @@ impl chain::Listen for Watcher {
for (uuid, breach) in valid_breaches {
log::info!("Notifying Responder and deleting appointment (uuid: {uuid})");

if let ConfirmationStatus::Rejected(_) = self.responder.handle_breach(
uuid,
breach,
self.appointments.lock().unwrap()[&uuid].user_id,
) {
if self
.responder
.handle_breach(
uuid,
breach,
self.appointments.lock().unwrap()[&uuid].user_id,
)
.rejected()
{
appointments_to_delete.insert(uuid);
} else {
delivered_appointments.insert(uuid);
Expand Down

0 comments on commit fa23e50

Please sign in to comment.