Skip to content

Commit

Permalink
scenarios: support signet in recon and test
Browse files Browse the repository at this point in the history
  • Loading branch information
pinheadmz committed Sep 11, 2024
1 parent fc48043 commit 161c218
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
35 changes: 25 additions & 10 deletions resources/scenarios/reconnaissance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,24 @@
from resources.scenarios.commander import Commander

# The entire Bitcoin Core test_framework directory is available as a library
from test_framework.messages import MSG_TX, CInv, msg_getdata
from test_framework.p2p import P2PInterface
from test_framework.messages import MSG_TX, CInv, msg_getdata, hash256
from test_framework.p2p import P2PInterface, MAGIC_BYTES


# This message is provided to the user when they describe the scenario
def cli_help():
return "Demonstrate network reconnaissance using a scenario and P2PInterface"


def get_signet_network_magic_from_node(node):
template = node.getblocktemplate({"rules": ["segwit", "signet"]})
challenge = template["signet_challenge"]
challenge_bytes = bytes.fromhex(challenge)
data = len(challenge_bytes).to_bytes() + challenge_bytes
digest = hash256(data)
return digest[0:4]


# The actual scenario is a class like a Bitcoin Core functional test.
# Commander is a subclass of BitcoinTestFramework instide Warnet
# that allows to operate on containerized nodes instead of local nodes.
Expand Down Expand Up @@ -46,21 +55,26 @@ def run_test(self):
# We pick a node on the network to attack
victim = peerinfo[0]

# The victim's address could be an explicit IP address with port
# OR a kubernetes hostname (with default chain p2p port)
# regtest or signet
chain = self.nodes[0].chain

# The victim's address could be an explicit IP address
# OR a kubernetes hostname (use default chain p2p port)
if ":" in victim["addr"]:
dstaddr, dstport = victim["addr"].split(":")
dstaddr = victim["addr"].split(":")[0]
else:
dstaddr = socket.gethostbyname(victim["addr"])
if self.chain == "regtest":
dstport = 18444
if self.chain == "signet":
dstport = 38333
if chain == "regtest":
dstport = 18444
if chain == "signet":
dstport = 38333
MAGIC_BYTES["signet"] = get_signet_network_magic_from_node(self.nodes[0])

# Now we will use a python-based Bitcoin p2p node to send very specific,
# unusual or non-standard messages to a "victim" node.
self.log.info(f"Attacking {dstaddr}:{dstport}")
attacker = P2PInterface()
attacker.peer_connect(dstaddr=dstaddr, dstport=dstport, net="regtest", timeout_factor=1)()
attacker.peer_connect(dstaddr=dstaddr, dstport=dstport, net=chain, timeout_factor=1)()
attacker.wait_until(lambda: attacker.is_connected, check_connected=False)

# Send a harmless network message we expect a response to and wait for it
Expand All @@ -69,6 +83,7 @@ def run_test(self):
msg.inv.append(CInv(t=MSG_TX, h=0))
attacker.send_and_ping(msg)
attacker.wait_until(lambda: attacker.message_count["notfound"] > 0)
self.log.info(f"Got notfound message from {dstaddr}:{dstport}")


if __name__ == "__main__":
Expand Down
22 changes: 15 additions & 7 deletions test/scenarios_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def setup_network(self):
def test_scenarios(self):
self.run_and_check_miner_scenario_from_file()
self.run_and_check_scenario_from_file()
self.check_regtest_recon()

def scenario_running(self, scenario_name: str):
"""Check that we are only running a single scenario of the correct name"""
Expand All @@ -40,15 +41,9 @@ def scenario_running(self, scenario_name: str):

def run_and_check_scenario_from_file(self):
scenario_file = "test/data/scenario_p2p_interface.py"

def check_scenario_clean_exit():
active = scenarios_active()
assert len(active) == 1
return active[0]["status"] == "succeeded"

self.log.info(f"Running scenario from: {scenario_file}")
self.warnet(f"run {scenario_file}")
self.wait_for_predicate(lambda: check_scenario_clean_exit())
self.wait_for_predicate(self.check_scenario_clean_exit)

def run_and_check_miner_scenario_from_file(self):
scenario_file = "resources/scenarios/miner_std.py"
Expand All @@ -59,6 +54,19 @@ def run_and_check_miner_scenario_from_file(self):
self.wait_for_predicate(lambda: self.check_blocks(2, start=start))
self.stop_scenario()

def check_regtest_recon(self):
scenario_file = "resources/scenarios/reconnaissance.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file}")
self.wait_for_predicate(self.check_scenario_clean_exit)

def check_scenario_clean_exit():
active = scenarios_active()
for scenario in active:
if scenario["status"] != "succeeded":
return False
return True

def check_blocks(self, target_blocks, start: int = 0):
count = int(self.warnet("bitcoin rpc tank-0000 getblockcount"))
self.log.debug(f"Current block count: {count}, target: {start + target_blocks}")
Expand Down
15 changes: 14 additions & 1 deletion test/signet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path

from test_base import TestBase

from warnet.status import _get_active_scenarios as scenarios_active

class SignetTest(TestBase):
def __init__(self):
Expand All @@ -19,6 +19,7 @@ def run_test(self):
try:
self.setup_network()
self.check_signet_miner()
self.check_signet_recon()
finally:
self.cleanup()

Expand Down Expand Up @@ -46,6 +47,18 @@ def block_one():

self.wait_for_predicate(block_one)

def check_signet_recon(self):
scenario_file = "resources/scenarios/reconnaissance.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file}")

def check_scenario_clean_exit():
active = scenarios_active()
for scenario in active:
if scenario["status"] != "succeeded":
return False
return True
self.wait_for_predicate(check_scenario_clean_exit)

if __name__ == "__main__":
test = SignetTest()
Expand Down

0 comments on commit 161c218

Please sign in to comment.