Skip to content

Commit

Permalink
wallet: RBF batch payments manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ecdsa committed Nov 14, 2024
1 parent fef6fc5 commit 45c7543
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 21 deletions.
23 changes: 9 additions & 14 deletions electrum/submarine_swaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,19 +464,11 @@ async def hold_invoice_callback(self, payment_hash: bytes) -> None:
if key in self.swaps:
swap = self.swaps[key]
if swap.funding_txid is None:
password = self.wallet.get_unlocked_password()
for batch_rbf in [False]:
# FIXME: tx batching is disabled, because extra logic is needed to handle
# the case where the base tx gets mined.
tx = self.create_funding_tx(swap, None, password=password, batch_rbf=batch_rbf)
self.logger.info(f'adding funding_tx {tx.txid()}')
self.wallet.adb.add_transaction(tx)
try:
await self.broadcast_funding_tx(swap, tx)
except TxBroadcastError:
self.wallet.adb.remove_transaction(tx.txid())
continue
break
output = self.create_funding_output(swap)
self.wallet.add_batch_payment(output)
swap.funding_txid = True
else:
self.logger.info(f'key not in swaps {key}')

def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
""" server method """
Expand Down Expand Up @@ -773,6 +765,9 @@ async def callback(payment_hash):
await asyncio.sleep(0.1)
return swap.funding_txid

def create_funding_output(self, swap):
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)

def create_funding_tx(
self,
swap: SwapData,
Expand All @@ -785,7 +780,7 @@ def create_funding_tx(
# note: rbf must not decrease payment
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
if tx is None:
funding_output = PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
funding_output = self.create_funding_output(swap)
tx = self.wallet.create_transaction(
outputs=[funding_output],
rbf=True,
Expand Down
132 changes: 125 additions & 7 deletions electrum/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
WalletFileException, BitcoinException,
InvalidPassword, format_time, timestamp_to_datetime, Satoshis,
Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex)
from .util import log_exceptions
from .simple_config import SimpleConfig, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE
from .bitcoin import COIN, TYPE_ADDRESS
from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold
Expand Down Expand Up @@ -88,6 +89,7 @@
from .util import EventListener, event_listener
from . import descriptor
from .descriptor import Descriptor
from .network import TxBroadcastError

if TYPE_CHECKING:
from .network import Network
Expand Down Expand Up @@ -459,6 +461,7 @@ async def main_loop(self):
async with self.taskgroup as group:
await group.spawn(asyncio.Event().wait) # run forever (until cancel)
await group.spawn(self.do_synchronize_loop())
await group.spawn(self.manage_batch_payments())
except Exception as e:
self.logger.exception("taskgroup died.")
finally:
Expand Down Expand Up @@ -1813,21 +1816,26 @@ def make_unsigned_transaction(
self, *,
coins: Sequence[PartialTxInput],
outputs: List[PartialTxOutput],
inputs: Optional[List[PartialTxInput]] = None,
fee=None,
change_addr: str = None,
is_sweep: bool = False, # used by Wallet_2fa subclass
rbf: bool = True,
batch_rbf: Optional[bool] = None,
base_tx = None,
send_change_to_lightning: Optional[bool] = None,
) -> PartialTransaction:
"""Can raise NotEnoughFunds or NoDynamicFeeEstimates."""

assert base_tx is None or inputs is None

if not coins: # any bitcoin tx must have at least 1 input by consensus
raise NotEnoughFunds()
if any([c.already_has_some_signatures() for c in coins]):
raise Exception("Some inputs already contain signatures!")
if batch_rbf is None:
batch_rbf = self.config.WALLET_BATCH_RBF
if inputs is None:
inputs = []
if base_tx is None and self.config.WALLET_BATCH_RBF:
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins)
if send_change_to_lightning is None:
send_change_to_lightning = self.config.WALLET_SEND_CHANGE_TO_LIGHTNING

Expand Down Expand Up @@ -1866,7 +1874,6 @@ def make_unsigned_transaction(
# Let the coin chooser select the coins to spend
coin_chooser = coinchooser.get_coin_chooser(self.config)
# If there is an unconfirmed RBF tx, merge with it
base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins) if batch_rbf else None
if base_tx:
# make sure we don't try to spend change from the tx-to-be-replaced:
coins = [c for c in coins if c.prevout.txid.hex() != base_tx.txid()]
Expand All @@ -1888,7 +1895,7 @@ def fee_estimator(size: Union[int, float, Decimal]) -> int:
old_change_addrs = [o.address for o in base_tx.outputs() if self.is_change(o.address)]
rbf_merge_txid = base_tx.txid()
else:
txi = []
txi = list(inputs)
txo = list(outputs)
old_change_addrs = []
# change address. if empty, coin_chooser will set it
Expand Down Expand Up @@ -3065,7 +3072,8 @@ def create_transaction(
password=None,
locktime=None,
tx_version: Optional[int] = None,
batch_rbf: Optional[bool] = None,
base_tx=None,
inputs=None,
send_change_to_lightning: Optional[bool] = None,
nonlocal_only: bool = False,
) -> PartialTransaction:
Expand All @@ -3082,10 +3090,11 @@ def create_transaction(
fee_estimator = fee
tx = self.make_unsigned_transaction(
coins=coins,
inputs=inputs,
outputs=outputs,
fee=fee_estimator,
change_addr=change_addr,
batch_rbf=batch_rbf,
base_tx=base_tx,
send_change_to_lightning=send_change_to_lightning,
rbf=rbf,
)
Expand Down Expand Up @@ -3292,6 +3301,115 @@ def unlock(self, password):
def get_unlocked_password(self):
return self._password_in_memory

def add_batch_payment(self, output: 'PartialTxOutput'):
# todo: raise InsufficientFunds if needed
self.batch_payments.append(output)

def find_confirmed_base_tx(self):
for tx in self.batch_txs:
tx_mined_status = self.adb.get_tx_height(tx.txid())
if tx_mined_status.conf > 0:
return tx

@log_exceptions
async def manage_batch_payments(self):
# batch rbf, and add it to adb before we broadcast it
# TODO: we should keep track of the transactions that have been replaced (base_tx)
# if a replaced transaction gets mined, we should ensure the payment is broadcast in a new tx
#
# output1 : tx1(o1) -----
# \
# output 2: tx1'(o1,o2) ---> tx2(tx1|o2) -----
# \ \
# output 3: tx1''(o1,o2,o3) --> tx2'(tx1|o2,o3) ---> tx3(tx2|o3)
# tx3(tx1'|o3) (if tx1'cannot be replaced)
#
# self.batch_txs = [tx1, tx1', tx1'']
#
# if tx1 gets mined:
# - use its output, batch all remaining payments: tx2(mined, o2,o3)
#
# if tx1' gets mined: tx3(mined, o3)
#
# what if we cannot RBF? -> we must add a child tx
# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2) and remove first row: neww base is now tx2(tx,o2)
# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3)
#
# that's the same strategy as if it was mined
#
#
# TODO: make this reorg-safe.
#
#
# TODO: persist batch_payments and batch_txs in wallet file.
# Note that it is probably fine not to persist them, but it is dangerous
# to persist one and not the other, as it might result in a double send.
self.batch_payments = [] # list of payments we need to make
self.batch_txs = [] # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined.


while True:
await asyncio.sleep(1)
password = self.get_unlocked_password()
if self.has_keystore_encryption() and not password:
continue
tx = self.find_confirmed_base_tx()
if tx:
# one of the batch_txs has been confirmed
# find which outputs still need to be paid
to_pay = [x for x in self.batch_payments if x not in tx.outputs()]
self.logger.info(f'base tx confirmed. to_pay={to_pay}')
if to_pay:
await self.create_new_base_tx(tx, to_pay, password)
else:
self.batch_txs = []
self.batch_payments = []
else:
base_tx = self.batch_txs[-1] if self.batch_txs else None
base_tx_outputs = base_tx.outputs() if base_tx else []
# check if all payments are in that tx
to_pay = [o for o in self.batch_payments if o not in base_tx_outputs]
if not to_pay:
continue
self.logger.info(f'manage_batch_payments {to_pay}')
tx = self.create_transaction(
outputs=to_pay,
rbf=True,
password=password,
base_tx=base_tx,
)
try:
#self.adb.add_transaction(tx)
await self.network.broadcast_transaction(tx)
self.batch_txs.append(tx)
except TxBroadcastError:
# base_tx is not replaceable, probabaly because it has children
#self.adb.remove_transaction(tx.txid())
await self.create_new_base_tx(base_tx, to_pay, password)

async def create_new_base_tx(self, tx, to_pay, password):
inputs = []
for o in tx.get_change_outputs():
coins = self.adb.get_addr_utxo(o.address)
inputs += list(coins.values())
self.logger.info(f'create_new_base_tx: inputs={inputs} outputs={to_pay}')
tx2 = self.create_transaction(
inputs=inputs,
outputs=to_pay,
password=password,
)
#self.adb.add_transaction(tx2)
try:
await self.network.broadcast_transaction(tx2)
except TxBroadcastError:
# we will retry later, because we have not changed batch_payments
self.logger.info(f'create_new_base_tx: failed to broadcast')
return

self.logger.info(f'create_new_base_tx: success {tx2.txid()}')
self.batch_txs = [tx2]
self.batch_payments = to_pay # this removes payments in the old tx


class Simple_Wallet(Abstract_Wallet):
# wallet with a single keystore
Expand Down

0 comments on commit 45c7543

Please sign in to comment.