diff --git a/resources/charts/bitcoincore/charts/lnd/values.yaml b/resources/charts/bitcoincore/charts/lnd/values.yaml index e09cc37f6..d56e65bf4 100644 --- a/resources/charts/bitcoincore/charts/lnd/values.yaml +++ b/resources/charts/bitcoincore/charts/lnd/values.yaml @@ -83,9 +83,9 @@ readinessProbe: timeoutSeconds: 1 startupProbe: failureThreshold: 10 - periodSeconds: 10 + periodSeconds: 30 successThreshold: 1 - timeoutSeconds: 10 + timeoutSeconds: 60 exec: command: - /bin/sh diff --git a/resources/scenarios/commander.py b/resources/scenarios/commander.py index ca7c16800..438a385e1 100644 --- a/resources/scenarios/commander.py +++ b/resources/scenarios/commander.py @@ -11,6 +11,7 @@ import ssl import sys import tempfile +import time from typing import Dict from kubernetes import client, config @@ -55,6 +56,7 @@ "rpc_port": int(pod.metadata.labels["RPCPort"]), "rpc_user": "user", "rpc_password": pod.metadata.labels["rpcpassword"], + "init_peers": pod.metadata.annotations["init_peers"] } ) @@ -82,41 +84,106 @@ def auth_proxy_request(self, method, path, postdata): class LND: def __init__(self, pod_name): + self.name = pod_name self.conn = http.client.HTTPSConnection( host=pod_name, port=8080, timeout=5, context=INSECURE_CONTEXT ) def get(self, uri): - self.conn.request( - method="GET", url=uri, headers={"Grpc-Metadata-macaroon": ADMIN_MACAROON_HEX} - ) - return self.conn.getresponse().read().decode("utf8") + while True: + try: + self.conn.request( + method="GET", + url=uri, + headers={ + "Grpc-Metadata-macaroon": ADMIN_MACAROON_HEX, + "Connection": "close" + } + ) + return self.conn.getresponse().read().decode("utf8") + except Exception: + time.sleep(1) def post(self, uri, data): body = json.dumps(data) - self.conn.request( - method="POST", - url=uri, - body=body, - headers={ - "Content-Type": "application/json", - "Content-Length": str(len(body)), - "Grpc-Metadata-macaroon": ADMIN_MACAROON_HEX, - }, - ) - # Stream output, otherwise we get a timeout error - res = self.conn.getresponse() - stream = "" + attempt = 0 while True: + attempt += 1 try: - data = res.read(1) - if len(data) == 0: - break - else: - stream += data.decode("utf8") + self.conn.request( + method="POST", + url=uri, + body=body, + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(body)), + "Grpc-Metadata-macaroon": ADMIN_MACAROON_HEX, + "Connection": "close" + }, + ) + # Stream output, otherwise we get a timeout error + res = self.conn.getresponse() + stream = "" + while True: + try: + data = res.read(1) + if len(data) == 0: + break + else: + stream += data.decode("utf8") + except Exception: + break + return stream except Exception: - break - return stream + time.sleep(1) + + def newaddress(self): + res = self.get( + "/v1/newaddress" + ) + return json.loads(res) + + def walletbalance(self): + res = self.get( + "/v1/balance/blockchain" + ) + return int(json.loads(res)["confirmed_balance"]) + + def uri(self): + res = self.get( + "/v1/getinfo" + ) + info = json.loads(res) + if "uris" not in info or len(info["uris"]) == 0: + return None + return info["uris"][0] + + def connect(self, target_uri): + pk, host = target_uri.split("@") + res = self.post( + "/v1/peers", + data= { + "addr": { + "pubkey": pk, + "host": host + } + } + ) + return json.loads(res) + + def channel(self, pk, local_amt, push_amt, fee_rate): + res = self.post( + "/v1/channels/stream", + data = { + "local_funding_amount": local_amt, + "push_sat": push_amt, + "node_pubkey": pk, + "sat_per_vbyte": fee_rate + } + ) + return json.loads(res) + + class Commander(BitcoinTestFramework): @@ -139,6 +206,13 @@ def ensure_miner(node): def hex_to_b64(hex): return base64.b64encode(bytes.fromhex(hex)).decode() + @staticmethod + def b64_to_hex(b64, reverse=False): + if reverse: + return base64.b64decode(b64)[::-1].hex() + else: + return base64.b64decode(b64).hex() + def handle_sigterm(self, signum, frame): print("SIGTERM received, stopping...") self.shutdown() @@ -193,6 +267,7 @@ def setup(self): coveragedir=self.options.coveragedir, ) node.rpc_connected = True + node.init_peers = int(tank["init_peers"]) self.nodes.append(node) self.tanks[tank["tank"]] = node diff --git a/resources/scenarios/ln_init.py b/resources/scenarios/ln_init.py index 82745a123..0e7c8dd7a 100644 --- a/resources/scenarios/ln_init.py +++ b/resources/scenarios/ln_init.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 +import threading from time import sleep - from commander import Commander @@ -14,171 +14,322 @@ def add_options(self, parser): parser.usage = "warnet run /path/to/ln_init.py" def run_test(self): - self.log.info("Lock out of IBD") - miner = self.ensure_miner(self.nodes[0]) - miner_addr = miner.getnewaddress() - self.generatetoaddress(self.nodes[0], 1, miner_addr, sync_fun=self.no_op) + ## + # L1 P2P + ## + self.log.info("Waiting for L1 p2p network connections...") + + def tank_connected(self, tank): + while True: + peers = tank.getpeerinfo() + count = sum(1 for peer in peers if peer.get("connection_type") == "manual" or peer.get("addnode") is True) + self.log.info(f"Tank {tank.tank} connected to {count}/{tank.init_peers} peers") + if count >= tank.init_peers: + break + else: + sleep(1) + + conn_threads = [threading.Thread(target=tank_connected, args=(self, tank)) for tank in self.nodes] + for thread in conn_threads: + thread.start() + + all(thread.join() is None for thread in conn_threads) + self.log.info(f"Network connected") + - self.log.info("Get LN nodes and wallet addresses") - ln_nodes = [] - recv_addrs = [] - for tank in self.warnet.tanks: - if tank.lnnode is not None: - recv_addrs.append(tank.lnnode.getnewaddress()) - ln_nodes.append(tank.index) - self.log.info("Fund LN wallets") + ## + # MINER + ## + self.log.info("Setting up miner...") miner = self.ensure_miner(self.nodes[0]) miner_addr = miner.getnewaddress() - # 298 block base - self.generatetoaddress(self.nodes[0], 297, miner_addr, sync_fun=self.no_op) - # divvy up the goods - split = (miner.getbalance() - 1) // len(recv_addrs) + + def gen(n): + return self.generatetoaddress(self.nodes[0], n, miner_addr, sync_fun=self.no_op) + + self.log.info("Locking out of IBD...") + gen(1) + + + + ## + # WALLET ADDRESSES + ## + self.log.info("Getting LN wallet addresses...") + ln_addrs = [] + + def get_ln_addr(self, name, ln): + while True: + res = ln.newaddress() + if "address" in res: + addr = res["address"] + ln_addrs.append(addr) + self.log.info(f"Got wallet address {addr} from {name}") + break + else: + self.log.info(f"Couldn't get wallet address from {name}:\n {res}\n wait and retry...") + sleep(1) + + addr_threads = [threading.Thread(target=get_ln_addr, args=(self, name, ln)) for name, ln in self.lns.items()] + for thread in addr_threads: + thread.start() + + all(thread.join() is None for thread in addr_threads) + self.log.info(f"Got {len(ln_addrs)} addresses from {len(self.lns)} LN nodes") + + + + ## + # FUNDS + ## + self.log.info("Funding LN wallets...") + # 298 block base for miner wallet + gen(297) + # divvy up the goods, except fee. + # 10 UTXOs per node means 10 channel opens per node per block + split = (miner.getbalance() - 1) // len(ln_addrs) // 10 sends = {} - for addr in recv_addrs: - sends[addr] = split - miner.sendmany("", sends) + for _ in range (10): + for addr in ln_addrs: + sends[addr] = split + miner.sendmany("", sends) # confirm funds in block 299 - self.generatetoaddress(self.nodes[0], 1, miner_addr, sync_fun=self.no_op) + gen(1) self.log.info( - f"Waiting for funds to be spendable: {split} BTC each for {len(recv_addrs)} LN nodes" + f"Waiting for funds to be spendable: 10x{split} BTC UTXOs each for {len(ln_addrs)} LN nodes" ) - def funded_lnnodes(): - for tank in self.warnet.tanks: - if tank.lnnode is None: - continue - if int(tank.lnnode.get_wallet_balance()) < (split * 100000000): - return False - return True - - self.wait_until(funded_lnnodes, timeout=5 * 60) - - ln_nodes_uri = ln_nodes.copy() - while len(ln_nodes_uri) > 0: - self.log.info( - f"Waiting for all LN nodes to have URI, LN nodes remaining: {ln_nodes_uri}" - ) - for index in ln_nodes_uri: - lnnode = self.warnet.tanks[index].lnnode - if lnnode.getURI(): - ln_nodes_uri.remove(index) - sleep(5) - - self.log.info("Adding p2p connections to LN nodes") - for edge in self.warnet.graph.edges(data=True): - (src, dst, data) = edge - # Copy the L1 p2p topology (where applicable) to L2 - # so we get a more robust p2p graph for lightning - if ( - "channel_open" not in data - and self.warnet.tanks[src].lnnode - and self.warnet.tanks[dst].lnnode - ): - self.warnet.tanks[src].lnnode.connect_to_tank(dst) - - # Start confirming channel opens in block 300 - self.log.info("Opening channels, one per block") - chan_opens = [] - edges = self.warnet.graph.edges(data=True, keys=True) - edges = sorted(edges, key=lambda edge: edge[2]) - for edge in edges: - (src, dst, key, data) = edge - if "channel_open" in data: - src_node = self.warnet.get_ln_node_from_tank(src) - assert src_node is not None - assert self.warnet.get_ln_node_from_tank(dst) is not None - self.log.info(f"opening channel {src}->{dst}") - chan_pt = src_node.open_channel_to_tank(dst, data["channel_open"]) - # We can guarantee deterministic short channel IDs as long as - # the change output is greater than the channel funding output, - # which will then be output 0 - assert chan_pt[64:] == ":0" - chan_opens.append((edge, chan_pt)) - self.log.info(f" pending channel point: {chan_pt}") - self.wait_until( - lambda chan_pt=chan_pt: chan_pt[:64] in self.nodes[0].getrawmempool() - ) - self.generatetoaddress(self.nodes[0], 1, miner_addr) - assert chan_pt[:64] not in self.nodes[0].getrawmempool() - height = self.nodes[0].getblockcount() - self.log.info(f" confirmed in block {height}") - self.log.info( - f" channel_id should be: {int.from_bytes(height.to_bytes(3, 'big') + (1).to_bytes(3, 'big') + (0).to_bytes(2, 'big'), 'big')}" - ) + def confirm_ln_balance(self, name, ln): + bal = 0 + while True: + bal = ln.walletbalance() + if bal >= (split * 100000000): + self.log.info(f"LN node {name} confirmed funds") + break + sleep(1) + + fund_threads = [threading.Thread(target=confirm_ln_balance, args=(self, name, ln)) for name, ln in self.lns.items()] + for thread in fund_threads: + thread.start() + + all(thread.join() is None for thread in fund_threads) + self.log.info(f"All LN nodes are funded") + + + + ## + # URIs + ## + self.log.info("Getting URIs for all LN nodes...") + ln_uris = {} + + def get_ln_uri(self, name, ln): + uri = None + while True: + uri = ln.uri() + if uri: + ln_uris[name] = uri + self.log.info(f"LN node {name} has URI {uri}") + break + sleep(1) + + uri_threads = [threading.Thread(target=get_ln_uri, args=(self, name, ln)) for name, ln in self.lns.items()] + for thread in uri_threads: + thread.start() - # Ensure all channel opens are sufficiently confirmed - self.generatetoaddress(self.nodes[0], 10, miner_addr, sync_fun=self.no_op) - ln_nodes_gossip = ln_nodes.copy() - while len(ln_nodes_gossip) > 0: - self.log.info(f"Waiting for graph gossip sync, LN nodes remaining: {ln_nodes_gossip}") - for index in ln_nodes_gossip: - lnnode = self.warnet.tanks[index].lnnode - count_channels = len(lnnode.get_graph_channels()) - count_graph_nodes = len(lnnode.get_graph_nodes()) - if count_channels == len(chan_opens) and count_graph_nodes == len(ln_nodes): - ln_nodes_gossip.remove(index) + all(thread.join() is None for thread in uri_threads) + self.log.info(f"Got URIs from all LN nodes") + + + + ## + # P2P CONNECTIONS + ## + self.log.info("Adding p2p connections to LN nodes...") + # (source: LND, target_uri: str) tuples of LND instances + connections = [] + # Cycle graph through all LN nodes + nodes = list(self.lns.values()) + prev_node = nodes[-1] + for node in nodes: + connections.append((node, prev_node)) + prev_node = node + # Explicit connections between every pair of channel partners + for ch in self.channels: + src = self.lns[ch["source"]] + tgt = self.lns[ch["target"]] + # Avoid duplicates and reciprocals + if (src, tgt) not in connections and (tgt, src) not in connections: + connections.append((src, tgt)) + + def connect_ln(self, pair): + while True: + res = pair[0].connect(ln_uris[pair[1].name]) + if res == {}: + self.log.info(f"Connected LN nodes {pair[0].name} -> {pair[1].name}") + break + if "message" in res: + if "already connected" in res["message"]: + self.log.info(f"Already connected LN nodes {pair[0].name} -> {pair[1].name}") + break + if "process of starting" in res["message"]: + self.log.info(f"{pair[0].name} not ready for connections yet, wait and retry...") + sleep(1) + else: + self.log.info(f"Unexpected response attempting to connect {pair[0].name} -> {pair[1].name}:\n {res}\n ABORTING") + break + + p2p_threads = [threading.Thread(target=connect_ln, args=(self, pair)) for pair in connections] + for thread in p2p_threads: + thread.start() + + all(thread.join() is None for thread in p2p_threads) + self.log.info(f"Established all LN p2p connections") + + + + ## + # CHANNELS + ## + self.log.info("Opening lightning channels...") + # Sort the channels by assigned block and index + # so their channel ids are deterministic + ch_by_block = {} + for ch in self.channels: + # TODO: if "id" not in ch ... + block = ch["id"]["block"] + if block not in ch_by_block: + ch_by_block[block] = [ch] + else: + ch_by_block[block].append(ch) + blocks = list(ch_by_block.keys()) + blocks = sorted(blocks) + + for target_block in blocks: + # First make sure the target block is the next block + current_height = self.nodes[0].getblockcount() + need = target_block - current_height + if need < 1: + raise Exception("Blockchain too long for deterministic channel ID") + if need > 1: + gen(need - 1) + + + def open_channel(self, ch, fee_rate): + src = self.lns[ch["source"]] + tgt_uri = ln_uris[ch["target"]] + tgt_pk, _ = tgt_uri.split("@") + self.log.info(f"Sending channel open from {ch['source']} -> {ch['target']} with fee_rate={fee_rate}") + res = src.channel( + pk=self.hex_to_b64(tgt_pk), + local_amt=ch["local_amt"], + push_amt=ch["push_amt"], + fee_rate=fee_rate + ) + if "result" not in res: + self.log.info( + f"Unexpected channel open response:\n " + + f"From {ch['source']} -> {ch['target']} fee_rate={fee_rate}\n " + + f"{res}" + ) else: + txid = self.b64_to_hex(res['result']['chan_pending']['txid'], reverse=True) + ch["txid"] = txid self.log.info( - f" node {index} not synced (channels: {count_channels}/{len(chan_opens)}, nodes: {count_graph_nodes}/{len(ln_nodes)})" + f"Channel open {ch['source']} -> {ch['target']}\n " + + f"outpoint={txid}:{res['result']['chan_pending']['output_index']}\n " + + f"expected channel id: {ch['id']}" ) - sleep(5) - - self.log.info("Updating channel policies") - for edge, chan_pt in chan_opens: - (src, dst, key, data) = edge - if "target_policy" in data: - target_node = self.warnet.get_ln_node_from_tank(dst) - target_node.update_channel_policy(chan_pt, data["target_policy"]) - if "source_policy" in data: - source_node = self.warnet.get_ln_node_from_tank(src) - source_node.update_channel_policy(chan_pt, data["source_policy"]) - - while True: - self.log.info("Waiting for all channel policies to match") - score = 0 - for tank_index, me in enumerate(ln_nodes): - you = (tank_index + 1) % len(ln_nodes) - my_channels = self.warnet.tanks[me].lnnode.get_graph_channels() - your_channels = self.warnet.tanks[you].lnnode.get_graph_channels() - match = True - for _chan_index, my_chan in enumerate(my_channels): - your_chan = [ - chan - for chan in your_channels - if chan.short_chan_id == my_chan.short_chan_id - ][0] - if not your_chan: - print(f"Channel policy missing for channel: {my_chan.short_chan_id}") - match = False - break - try: - if not my_chan.channel_match(your_chan): - print( - f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" - ) - match = False - break - except Exception as e: - print(f"Error comparing channel policies: {e}") - print( - f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" - ) - match = False - break - if match: - print(f"All channel policies match between tanks {me} & {you}") - score += 1 - print(f"Score: {score} / {len(ln_nodes)}") - if score == len(ln_nodes): - break - sleep(5) + channels = sorted(ch_by_block[target_block], key=lambda ch: ch["id"]["index"]) + index = 0 + fee_rate = 5006 #s/vB, decreases by 5 per tx for up to 1000 txs per block + ch_threads = [] + for ch in channels: + index += 1 + fee_rate -= 5 + assert index == ch["id"]["index"], "Channel ID indexes are not consecutive" + assert fee_rate >= 1, "Too many TXs in block, out of fee range" + t = threading.Thread(target=open_channel, args=(self, ch, fee_rate)) + t.start() + ch_threads.append(t) - self.log.info( - f"Warnet LN ready with {len(recv_addrs)} nodes and {len(chan_opens)} channels." - ) + all(thread.join() is None for thread in ch_threads) + self.log.info(f"Waiting for {len(channels)} channel opens in mempool...") + self.wait_until(lambda: self.nodes[0].getmempoolinfo()["size"] >= len(channels), timeout=500) + block_hash = gen(1)[0] + self.log.info(f"Confirmed {len(channels)} channel opens in block {target_block}") + self.log.info(f"Checking deterministic channel IDs in block...") + block = self.nodes[0].getblock(block_hash) + block_txs = block["tx"] + block_height = block["height"] + for ch in channels: + assert ch["id"]["block"] == block_height + assert block_txs[ch["id"]["index"]] == ch["txid"] + self.log.info("👍") + + + gen(5) + self.log.info(f"Confirmed {len(self.channels)} total channel opens") + + + + # self.log.info("Updating channel policies") + # for edge, chan_pt in chan_opens: + # (src, dst, key, data) = edge + # if "target_policy" in data: + # target_node = self.warnet.get_ln_node_from_tank(dst) + # target_node.update_channel_policy(chan_pt, data["target_policy"]) + # if "source_policy" in data: + # source_node = self.warnet.get_ln_node_from_tank(src) + # source_node.update_channel_policy(chan_pt, data["source_policy"]) + + # while True: + # self.log.info("Waiting for all channel policies to match") + # score = 0 + # for tank_index, me in enumerate(ln_nodes): + # you = (tank_index + 1) % len(ln_nodes) + # my_channels = self.warnet.tanks[me].lnnode.get_graph_channels() + # your_channels = self.warnet.tanks[you].lnnode.get_graph_channels() + # match = True + # for _chan_index, my_chan in enumerate(my_channels): + # your_chan = [ + # chan + # for chan in your_channels + # if chan.short_chan_id == my_chan.short_chan_id + # ][0] + # if not your_chan: + # print(f"Channel policy missing for channel: {my_chan.short_chan_id}") + # match = False + # break + + # try: + # if not my_chan.channel_match(your_chan): + # print( + # f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" + # ) + # match = False + # break + # except Exception as e: + # print(f"Error comparing channel policies: {e}") + # print( + # f"Channel policy doesn't match between tanks {me} & {you}: {my_chan.short_chan_id}" + # ) + # match = False + # break + # if match: + # print(f"All channel policies match between tanks {me} & {you}") + # score += 1 + # print(f"Score: {score} / {len(ln_nodes)}") + # if score == len(ln_nodes): + # break + # sleep(5) + + # self.log.info( + # f"Warnet LN ready with {len(recv_addrs)} nodes and {len(chan_opens)} channels." + # ) def main(): diff --git a/src/warnet/graph.py b/src/warnet/graph.py index 193b6d9f4..e10caff36 100644 --- a/src/warnet/graph.py +++ b/src/warnet/graph.py @@ -281,12 +281,11 @@ def import_policy(json_policy): count = 0 for edge in sorted_edges: source = pk_to_tank[edge["node1_pub"]] - amt = int(edge["capacity"]) // 2 channel = { "id": {"block": block, "index": index}, "target": pk_to_tank[edge["node2_pub"]] + "-ln", - "local_amt": amt, - "push_amt": amt - 1, + "local_amt": int(edge["capacity"]), + "push_amt": int(edge["capacity"]) // 2, "source_policy": import_policy(edge["node1_policy"]), "target_policy": import_policy(edge["node2_policy"]), }