diff --git a/teos/src/carrier.rs b/teos/src/carrier.rs index 15c9d835..0b5741b6 100644 --- a/teos/src/carrier.rs +++ b/teos/src/carrier.rs @@ -104,15 +104,23 @@ impl Carrier { ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR) } rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => { - log::info!( - "Transaction was confirmed long ago, not keeping track of it: {}", - tx.txid() - ); - - // Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by - // checking whether the transaction id can be found in the [Responder]'s [TxIndex], meaning that if the transaction bounces it was confirmed long - // ago (> IRREVOCABLY_RESOLVED), so we don't need to worry about it. - ConfirmationStatus::IrrevocablyResolved + if self.bitcoin_cli.get_block_count().unwrap() as u32 > self.block_height { + // We are out of sync, either we are trying to send things to bitcoind after a reorg (and we have not reached the new tip yet) + // or a block was found right when we were trying to send something and we have not yet processed it. + // In both cases we don't know if what we are trying to send is really old (IRREVOCABLY_RESOLVED) or if it'll just be processed + // in our way up to the new tip (we need to make this work both for txindex and prune mode, so getrawtransaction is not an option) + ConfirmationStatus::OffSync + } else { + log::info!( + "Transaction was confirmed long ago, not keeping track of it: {}", + tx.txid() + ); + + // Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by + // checking whether the transaction id can be found in the [Responder]'s [TxIndex], and we know we are on sync, so he transaction + // must have confirmed a long ago (> IRREVOCABLY_RESOLVED). Therefore, we don't need to worry about it anymore. + ConfirmationStatus::IrrevocablyResolved + } } rpc_errors::RPC_DESERIALIZATION_ERROR => { // Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder, @@ -311,12 +319,44 @@ mod tests { #[test] fn test_send_transaction_verify_already_in_chain() { - let bitcoind_mock = BitcoindMock::new(MockOptions::with_error( - rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64, - )); + let start_height = START_HEIGHT as u32; + // Set the backend to be one block ahead of us + let bitcoind_mock = BitcoindMock::new( + MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64) + .at_height(start_height + 1), + ); let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); + start_server(bitcoind_mock.server); + + let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); + let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap(); + let r = carrier.send_transaction(&tx); + + // We are offsync, so the transaction should bounce but tell us about it + assert_eq!(r, ConfirmationStatus::OffSync); + assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + + // Try again, but this time being onsync, now we should get an IrrevocablyResolved + // We first need to clear the issued_receipts + carrier.issued_receipts.remove(&tx.txid()); + // And either increase our height + carrier.block_height += 1; + + let r = carrier.send_transaction(&tx); + assert_eq!(r, ConfirmationStatus::IrrevocablyResolved); + assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r); + } + + #[test] + fn test_send_transaction_verify_already_in_chain_offsync() { let start_height = START_HEIGHT as u32; + let bitcoind_mock = BitcoindMock::new( + MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64) + .at_height(start_height), + ); + let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new())); + let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap()); start_server(bitcoind_mock.server); let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height); diff --git a/teos/src/responder.rs b/teos/src/responder.rs index 9ffebda0..d35b7428 100644 --- a/teos/src/responder.rs +++ b/teos/src/responder.rs @@ -29,6 +29,7 @@ pub enum ConfirmationStatus { ConfirmedIn(u32), InMempoolSince(u32), IrrevocablyResolved, + OffSync, Rejected(i32), ReorgedOut, } @@ -70,6 +71,16 @@ impl ConfirmationStatus { ConfirmationStatus::ConfirmedIn(_) | &ConfirmationStatus::InMempoolSince(_) ) } + + /// Whether the transaction was rejected + pub fn rejected(&self) -> bool { + matches!(self, ConfirmationStatus::Rejected(_)) + } + + /// Whether the transaction couldn't be processed because we are off sync. + pub fn off_sync(&self) -> bool { + matches!(self, ConfirmationStatus::OffSync) + } } /// Minimal data required in memory to keep track of transaction trackers. @@ -222,7 +233,7 @@ impl Responder { carrier.send_transaction(&breach.penalty_tx) }; - if status.accepted() { + if status.accepted() || status.off_sync() { self.add_tracker(uuid, breach, user_id, status); } @@ -348,17 +359,19 @@ impl Responder { ) -> HashMap)> { let dbm = self.dbm.lock().unwrap(); let mut tx_to_rebroadcast = HashMap::new(); - let mut tracker: TransactionTracker; for (uuid, t) in self.trackers.lock().unwrap().iter() { if let ConfirmationStatus::InMempoolSince(h) = t.status { if (height - h) as u8 >= CONFIRMATIONS_BEFORE_RETRY { - tracker = dbm.load_tracker(*uuid).unwrap(); - tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, None)); + tx_to_rebroadcast + .insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None)); } } else if let ConfirmationStatus::ReorgedOut = t.status { - tracker = dbm.load_tracker(*uuid).unwrap(); + let tracker = dbm.load_tracker(*uuid).unwrap(); tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, Some(tracker.dispute_tx))); + } else if t.status.off_sync() { + tx_to_rebroadcast + .insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None)); } } @@ -413,13 +426,15 @@ impl Responder { if tx_index.contains_key(&dispute_tx.txid()) | carrier.in_mempool(&dispute_tx.txid()) { - // Dispute tx is on chain (or mempool), so we only need to care about the penalty + // Dispute tx is on chain (or mempool), so we only need to care about the penalty. + // We know for a fact that the penalty is not in the index, because otherwise it would have been received + // a confirmation during the processing of the current block (hence it would not have been passed to this method) carrier.send_transaction(&penalty_tx) } else { // Dispute tx has also been reorged out, meaning that both transactions need to be broadcast. // DISCUSS: For lightning transactions, if the dispute has been reorged the penalty cannot make it to the network. // If we keep this general, the dispute can simply be a trigger and the penalty doesn't necessarily have to spend from it. - // We'll keel it lightning specific, at least for now. + // We'll keep it lightning specific, at least for now. let status = carrier.send_transaction(&dispute_tx); if let ConfirmationStatus::Rejected(e) = status { log::error!( @@ -428,14 +443,15 @@ impl Responder { ); status } else { - // The dispute was accepted, so we can rebroadcast the penalty. + // If the dispute is not rejected we can send the penalty. + // Notice this covers both the dispute being accepted or bouncing, given bouncing will mean it is already on chain. carrier.send_transaction(&penalty_tx) } } } else { // The tracker has simply reached CONFIRMATIONS_BEFORE_RETRY missed confirmations. log::warn!( - "Penalty transaction has missed many confirmations: {}", + "Penalty transaction has missed many confirmations or was sent while we were off-sync with the backend: {}", penalty_tx.txid() ); carrier.send_transaction(&penalty_tx) @@ -444,11 +460,14 @@ impl Responder { if let ConfirmationStatus::Rejected(_) = status { rejected.insert(uuid); } else { - // Update the tracker if it gets accepted. This will also update the height (since when we are counting the tracker - // to have been in mempool), so it resets the wait period instead of trying to rebroadcast every block. + // Update the status if the tracker is not rejected. This will update the height for InMempooolSince, resetting the + // missed confirmation counter, or flag it as OffSync if we happen to not be on sync. // DISCUSS: We may want to find another approach in the future for the InMempoool transactions. trackers.get_mut(&uuid).unwrap().status = status; - accepted.insert(uuid, status); + + if status.accepted() { + accepted.insert(uuid, status); + } } } diff --git a/teos/src/test_utils.rs b/teos/src/test_utils.rs index b7959fd3..225baad2 100644 --- a/teos/src/test_utils.rs +++ b/teos/src/test_utils.rs @@ -538,6 +538,7 @@ pub(crate) struct BitcoindMock { #[derive(Default)] pub(crate) struct MockOptions { + height: u32, error_code: Option, in_mempool: bool, } @@ -545,6 +546,7 @@ pub(crate) struct MockOptions { impl MockOptions { pub fn with_error(error_code: i64) -> Self { Self { + height: 0, error_code: Some(error_code), in_mempool: false, } @@ -552,16 +554,23 @@ impl MockOptions { pub fn in_mempool() -> Self { Self { + height: 0, error_code: None, in_mempool: true, } } + + pub fn at_height(self, h: u32) -> Self { + Self { height: h, ..self } + } } impl BitcoindMock { pub fn new(options: MockOptions) -> Self { let mut io = IoHandler::default(); + BitcoindMock::add_getblockcount(&mut io, options.height); + if let Some(error) = options.error_code { io.add_sync_method("error", move |_params: Params| { Err(JsonRpcError::new(JsonRpcErrorCode::ServerError(error))) @@ -614,6 +623,12 @@ impl BitcoindMock { }) } + fn add_getblockcount(io: &mut IoHandler, h: u32) { + io.add_sync_method("getblockcount", move |_params: Params| { + Ok(Value::Number(h.into())) + }); + } + pub fn url(&self) -> &str { &self.url } diff --git a/teos/src/watcher.rs b/teos/src/watcher.rs index ec2e7da5..c49b87dc 100644 --- a/teos/src/watcher.rs +++ b/teos/src/watcher.rs @@ -700,11 +700,15 @@ impl chain::Listen for Watcher { for (uuid, breach) in valid_breaches { log::info!("Notifying Responder and deleting appointment (uuid: {uuid})"); - if let ConfirmationStatus::Rejected(_) = self.responder.handle_breach( - uuid, - breach, - self.appointments.lock().unwrap()[&uuid].user_id, - ) { + if self + .responder + .handle_breach( + uuid, + breach, + self.appointments.lock().unwrap()[&uuid].user_id, + ) + .rejected() + { appointments_to_delete.insert(uuid); } else { delivered_appointments.insert(uuid);