From 8079b60b89441fda1a9ffc9c5a0bef8e8f9028a4 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 23 Oct 2023 16:34:35 +1030 Subject: [PATCH] connectd: fix forwarding after tx_abort. If we get a WIRE_TX_ABORT then another message, we send the other message to the same subd (even though the tx abort causes it to shutdown). This means we effectively lose the next message, and timeout (see below from CI, reproduced locally). So, have connectd ignore the subd after it forwards the WIRE_TX_ABORT. The next message will, correctly, cause a fresh subdaemon to be spawned. ``` @unittest.skipIf(TEST_NETWORK != 'regtest', 'elementsd doesnt yet support PSBT features we need') @pytest.mark.openchannel('v2') def test_v2_rbf_multi(node_factory, bitcoind, chainparams): l1, l2 = node_factory.get_nodes(2, opts={'may_reconnect': True, 'dev-no-reconnect': None, 'allow_warning': True}) l1.rpc.connect(l2.info['id'], 'localhost', l2.port) amount = 2**24 chan_amount = 100000 bitcoind.rpc.sendtoaddress(l1.rpc.newaddr()['bech32'], amount / 10**8 + 0.01) bitcoind.generate_block(1) # Wait for it to arrive. wait_for(lambda: len(l1.rpc.listfunds()['outputs']) > 0) res = l1.rpc.fundchannel(l2.info['id'], chan_amount) chan_id = res['channel_id'] vins = bitcoind.rpc.decoderawtransaction(res['tx'])['vin'] assert(only_one(vins)) prev_utxos = ["{}:{}".format(vins[0]['txid'], vins[0]['vout'])] # Check that we're waiting for lockin l1.daemon.wait_for_log(' to DUALOPEND_AWAITING_LOCKIN') # Attempt to do abort, should fail since we've # already gotten an inflight with pytest.raises(RpcError): l1.rpc.openchannel_abort(chan_id) rate = int(find_next_feerate(l1, l2)[:-5]) # We 4x the feerate to beat the min-relay fee next_feerate = '{}perkw'.format(rate * 4) # Initiate an RBF startweight = 42 + 172 # base weight, funding output initpsbt = l1.rpc.utxopsbt(chan_amount, next_feerate, startweight, prev_utxos, reservedok=True, min_witness_weight=110, excess_as_change=True) # Do the bump bump = l1.rpc.openchannel_bump(chan_id, chan_amount, initpsbt['psbt'], funding_feerate=next_feerate) # Abort this open attempt! We will re-try aborted = l1.rpc.openchannel_abort(chan_id) assert not aborted['channel_canceled'] # We no longer disconnect on aborts, because magic! assert only_one(l1.rpc.listpeers()['peers'])['connected'] # Do the bump, again, same feerate > bump = l1.rpc.openchannel_bump(chan_id, chan_amount, initpsbt['psbt'], funding_feerate=next_feerate) tests/test_opening.py:668: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ contrib/pyln-client/pyln/client/lightning.py:1206: in openchannel_bump return self.call("openchannel_bump", payload) contrib/pyln-testing/pyln/testing/utils.py:718: in call res = LightningRpc.call(self, method, payload, cmdprefix, filter) contrib/pyln-client/pyln/client/lightning.py:398: in call resp, buf = self._readobj(sock, buf) contrib/pyln-client/pyln/client/lightning.py:315: in _readobj b = sock.recv(max(1024, len(buff))) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = length = 1024 def recv(self, length: int) -> bytes: if self.sock is None: raise socket.error("not connected") > return self.sock.recv(length) E Failed: Timeout >1200.0s ``` --- connectd/multiplex.c | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 0e80eedda351..139fcdc2897a 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -58,6 +58,9 @@ struct subd { /* Output buffer */ struct msg_queue *outq; + + /* After we've told it to tx_abort, we don't send anything else. */ + bool rcvd_tx_abort; }; static struct subd *find_subd(struct peer *peer, @@ -66,6 +69,10 @@ static struct subd *find_subd(struct peer *peer, for (size_t i = 0; i < tal_count(peer->subds); i++) { struct subd *subd = peer->subds[i]; + /* Once we sent it tx_abort, we pretend it doesn't exist */ + if (subd->rcvd_tx_abort) + continue; + /* Once we see a message using the real channel_id, we * clear the temporary_channel_id */ if (channel_id_eq(&subd->channel_id, channel_id)) { @@ -1040,6 +1047,7 @@ static struct subd *new_subd(struct peer *peer, subd->temporary_channel_id = NULL; subd->opener_revocation_basepoint = NULL; subd->conn = NULL; + subd->rcvd_tx_abort = false; /* Connect it to the peer */ tal_arr_expand(&peer->subds, subd); @@ -1056,6 +1064,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, u8 *decrypted; struct channel_id channel_id; struct subd *subd; + enum peer_wire type; + decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs, peer->peer_in); @@ -1066,6 +1076,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, } tal_free(peer->peer_in); + type = fromwire_peektype(decrypted); + /* dev_disconnect can disable read */ if (!peer->dev_read_enabled) return read_hdr_from_peer(peer_conn, peer); @@ -1083,8 +1095,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, /* After this we should be able to match to subd by channel_id */ if (!extract_channel_id(decrypted, &channel_id)) { - enum peer_wire type = fromwire_peektype(decrypted); - /* We won't log this anywhere else, so do it here. */ status_peer_io(LOG_IO_IN, &peer->id, decrypted); @@ -1137,6 +1147,15 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, /* Tell them to write. */ msg_enqueue(subd->outq, take(decrypted)); + /* Is this a tx_abort? Ignore from now on, and close after sending! */ + if (type == WIRE_TX_ABORT) { + subd->rcvd_tx_abort = true; + /* In case it doesn't close by itself */ + notleak(new_reltimer(&peer->daemon->timers, subd, + time_from_sec(5), + close_subd_timeout, subd)); + } + /* Wait for them to wake us */ return io_wait(peer_conn, &peer->peer_in, read_hdr_from_peer, peer); }