From 6ac8f9f7f42c8505e5549310671e9211eca1c78f Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 11 Sep 2024 11:58:45 -0400 Subject: [PATCH] scenarios: add recon scenario (#577) * scenarios: add recon scenario * scenario: support signet in recon * scenarios: support signet in recon and test * lint --- resources/scenarios/reconnaissance.py | 90 +++++++++++++++++++++++++++ test/scenarios_test.py | 19 +++--- test/signet_test.py | 14 +++++ 3 files changed, 116 insertions(+), 7 deletions(-) create mode 100755 resources/scenarios/reconnaissance.py diff --git a/resources/scenarios/reconnaissance.py b/resources/scenarios/reconnaissance.py new file mode 100755 index 000000000..1440b119b --- /dev/null +++ b/resources/scenarios/reconnaissance.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 + +import socket + +# The base class exists inside the commander container when deployed, +# but requires a relative path inside the python source code for other functions. +try: + from commander import Commander +except ImportError: + from resources.scenarios.commander import Commander + +# The entire Bitcoin Core test_framework directory is available as a library +from test_framework.messages import MSG_TX, CInv, hash256, msg_getdata +from test_framework.p2p import MAGIC_BYTES, P2PInterface + + +# This message is provided to the user when they describe the scenario +def cli_help(): + return "Demonstrate network reconnaissance using a scenario and P2PInterface" + + +def get_signet_network_magic_from_node(node): + template = node.getblocktemplate({"rules": ["segwit", "signet"]}) + challenge = template["signet_challenge"] + challenge_bytes = bytes.fromhex(challenge) + data = len(challenge_bytes).to_bytes() + challenge_bytes + digest = hash256(data) + return digest[0:4] + + +# The actual scenario is a class like a Bitcoin Core functional test. +# Commander is a subclass of BitcoinTestFramework instide Warnet +# that allows to operate on containerized nodes instead of local nodes. +class Reconnaissance(Commander): + def set_test_params(self): + # This setting is ignored but still required as + # a sub-class of BitcoinTestFramework + self.num_nodes = 1 + + # Scenario entrypoint + def run_test(self): + self.log.info("Getting peer info") + + # Just like a typical Bitcoin Core functional test, this executes an + # RPC on a node in the network. The actual node at self.nodes[0] may + # be different depending on the user deploying the scenario. Users in + # Warnet may have different namepsace access but everyone should always + # have access to at least one node. + peerinfo = self.nodes[0].getpeerinfo() + for peer in peerinfo: + # You can print out the the scenario logs with `warnet logs` + # which have a list of all this node's peers' addresses and version + self.log.info(f"{peer['addr']} {peer['subver']}") + + # We pick a node on the network to attack + victim = peerinfo[0] + + # regtest or signet + chain = self.nodes[0].chain + + # The victim's address could be an explicit IP address + # OR a kubernetes hostname (use default chain p2p port) + if ":" in victim["addr"]: + dstaddr = victim["addr"].split(":")[0] + else: + dstaddr = socket.gethostbyname(victim["addr"]) + if chain == "regtest": + dstport = 18444 + if chain == "signet": + dstport = 38333 + MAGIC_BYTES["signet"] = get_signet_network_magic_from_node(self.nodes[0]) + + # Now we will use a python-based Bitcoin p2p node to send very specific, + # unusual or non-standard messages to a "victim" node. + self.log.info(f"Attacking {dstaddr}:{dstport}") + attacker = P2PInterface() + attacker.peer_connect(dstaddr=dstaddr, dstport=dstport, net=chain, timeout_factor=1)() + attacker.wait_until(lambda: attacker.is_connected, check_connected=False) + + # Send a harmless network message we expect a response to and wait for it + # Ask for TX with a 0 hash + msg = msg_getdata() + msg.inv.append(CInv(t=MSG_TX, h=0)) + attacker.send_and_ping(msg) + attacker.wait_until(lambda: attacker.message_count["notfound"] > 0) + self.log.info(f"Got notfound message from {dstaddr}:{dstport}") + + +if __name__ == "__main__": + Reconnaissance().main() diff --git a/test/scenarios_test.py b/test/scenarios_test.py index 8be7f4a14..0765c3ebf 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -31,6 +31,7 @@ def setup_network(self): def test_scenarios(self): self.run_and_check_miner_scenario_from_file() self.run_and_check_scenario_from_file() + self.check_regtest_recon() def scenario_running(self, scenario_name: str): """Check that we are only running a single scenario of the correct name""" @@ -40,15 +41,9 @@ def scenario_running(self, scenario_name: str): def run_and_check_scenario_from_file(self): scenario_file = "test/data/scenario_p2p_interface.py" - - def check_scenario_clean_exit(): - active = scenarios_active() - assert len(active) == 1 - return active[0]["status"] == "succeeded" - self.log.info(f"Running scenario from: {scenario_file}") self.warnet(f"run {scenario_file}") - self.wait_for_predicate(lambda: check_scenario_clean_exit()) + self.wait_for_predicate(self.check_scenario_clean_exit) def run_and_check_miner_scenario_from_file(self): scenario_file = "resources/scenarios/miner_std.py" @@ -59,6 +54,16 @@ def run_and_check_miner_scenario_from_file(self): self.wait_for_predicate(lambda: self.check_blocks(2, start=start)) self.stop_scenario() + def check_regtest_recon(self): + scenario_file = "resources/scenarios/reconnaissance.py" + self.log.info(f"Running scenario from file: {scenario_file}") + self.warnet(f"run {scenario_file}") + self.wait_for_predicate(self.check_scenario_clean_exit) + + def check_scenario_clean_exit(self): + active = scenarios_active() + return all(scenario["status"] == "succeeded" for scenario in active) + def check_blocks(self, target_blocks, start: int = 0): count = int(self.warnet("bitcoin rpc tank-0000 getblockcount")) self.log.debug(f"Current block count: {count}, target: {start + target_blocks}") diff --git a/test/signet_test.py b/test/signet_test.py index 68ec78713..f7ac74ee1 100755 --- a/test/signet_test.py +++ b/test/signet_test.py @@ -6,6 +6,8 @@ from test_base import TestBase +from warnet.status import _get_active_scenarios as scenarios_active + class SignetTest(TestBase): def __init__(self): @@ -19,6 +21,7 @@ def run_test(self): try: self.setup_network() self.check_signet_miner() + self.check_signet_recon() finally: self.cleanup() @@ -46,6 +49,17 @@ def block_one(): self.wait_for_predicate(block_one) + def check_signet_recon(self): + scenario_file = "resources/scenarios/reconnaissance.py" + self.log.info(f"Running scenario from file: {scenario_file}") + self.warnet(f"run {scenario_file}") + + def check_scenario_clean_exit(): + active = scenarios_active() + return all(scenario["status"] == "succeeded" for scenario in active) + + self.wait_for_predicate(check_scenario_clean_exit) + if __name__ == "__main__": test = SignetTest()