Skip to content

Commit

Permalink
scenarios: add recon scenario (#577)
Browse files Browse the repository at this point in the history
* scenarios: add recon scenario

* scenario: support signet in recon

* scenarios: support signet in recon and test

* lint
  • Loading branch information
pinheadmz authored Sep 11, 2024
1 parent 20d514c commit 6ac8f9f
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 7 deletions.
90 changes: 90 additions & 0 deletions resources/scenarios/reconnaissance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/usr/bin/env python3

import socket

# The base class exists inside the commander container when deployed,
# but requires a relative path inside the python source code for other functions.
try:
from commander import Commander
except ImportError:
from resources.scenarios.commander import Commander

# The entire Bitcoin Core test_framework directory is available as a library
from test_framework.messages import MSG_TX, CInv, hash256, msg_getdata
from test_framework.p2p import MAGIC_BYTES, P2PInterface


# This message is provided to the user when they describe the scenario
def cli_help():
return "Demonstrate network reconnaissance using a scenario and P2PInterface"


def get_signet_network_magic_from_node(node):
template = node.getblocktemplate({"rules": ["segwit", "signet"]})
challenge = template["signet_challenge"]
challenge_bytes = bytes.fromhex(challenge)
data = len(challenge_bytes).to_bytes() + challenge_bytes
digest = hash256(data)
return digest[0:4]


# The actual scenario is a class like a Bitcoin Core functional test.
# Commander is a subclass of BitcoinTestFramework instide Warnet
# that allows to operate on containerized nodes instead of local nodes.
class Reconnaissance(Commander):
def set_test_params(self):
# This setting is ignored but still required as
# a sub-class of BitcoinTestFramework
self.num_nodes = 1

# Scenario entrypoint
def run_test(self):
self.log.info("Getting peer info")

# Just like a typical Bitcoin Core functional test, this executes an
# RPC on a node in the network. The actual node at self.nodes[0] may
# be different depending on the user deploying the scenario. Users in
# Warnet may have different namepsace access but everyone should always
# have access to at least one node.
peerinfo = self.nodes[0].getpeerinfo()
for peer in peerinfo:
# You can print out the the scenario logs with `warnet logs`
# which have a list of all this node's peers' addresses and version
self.log.info(f"{peer['addr']} {peer['subver']}")

# We pick a node on the network to attack
victim = peerinfo[0]

# regtest or signet
chain = self.nodes[0].chain

# The victim's address could be an explicit IP address
# OR a kubernetes hostname (use default chain p2p port)
if ":" in victim["addr"]:
dstaddr = victim["addr"].split(":")[0]
else:
dstaddr = socket.gethostbyname(victim["addr"])
if chain == "regtest":
dstport = 18444
if chain == "signet":
dstport = 38333
MAGIC_BYTES["signet"] = get_signet_network_magic_from_node(self.nodes[0])

# Now we will use a python-based Bitcoin p2p node to send very specific,
# unusual or non-standard messages to a "victim" node.
self.log.info(f"Attacking {dstaddr}:{dstport}")
attacker = P2PInterface()
attacker.peer_connect(dstaddr=dstaddr, dstport=dstport, net=chain, timeout_factor=1)()
attacker.wait_until(lambda: attacker.is_connected, check_connected=False)

# Send a harmless network message we expect a response to and wait for it
# Ask for TX with a 0 hash
msg = msg_getdata()
msg.inv.append(CInv(t=MSG_TX, h=0))
attacker.send_and_ping(msg)
attacker.wait_until(lambda: attacker.message_count["notfound"] > 0)
self.log.info(f"Got notfound message from {dstaddr}:{dstport}")


if __name__ == "__main__":
Reconnaissance().main()
19 changes: 12 additions & 7 deletions test/scenarios_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def setup_network(self):
def test_scenarios(self):
self.run_and_check_miner_scenario_from_file()
self.run_and_check_scenario_from_file()
self.check_regtest_recon()

def scenario_running(self, scenario_name: str):
"""Check that we are only running a single scenario of the correct name"""
Expand All @@ -40,15 +41,9 @@ def scenario_running(self, scenario_name: str):

def run_and_check_scenario_from_file(self):
scenario_file = "test/data/scenario_p2p_interface.py"

def check_scenario_clean_exit():
active = scenarios_active()
assert len(active) == 1
return active[0]["status"] == "succeeded"

self.log.info(f"Running scenario from: {scenario_file}")
self.warnet(f"run {scenario_file}")
self.wait_for_predicate(lambda: check_scenario_clean_exit())
self.wait_for_predicate(self.check_scenario_clean_exit)

def run_and_check_miner_scenario_from_file(self):
scenario_file = "resources/scenarios/miner_std.py"
Expand All @@ -59,6 +54,16 @@ def run_and_check_miner_scenario_from_file(self):
self.wait_for_predicate(lambda: self.check_blocks(2, start=start))
self.stop_scenario()

def check_regtest_recon(self):
scenario_file = "resources/scenarios/reconnaissance.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file}")
self.wait_for_predicate(self.check_scenario_clean_exit)

def check_scenario_clean_exit(self):
active = scenarios_active()
return all(scenario["status"] == "succeeded" for scenario in active)

def check_blocks(self, target_blocks, start: int = 0):
count = int(self.warnet("bitcoin rpc tank-0000 getblockcount"))
self.log.debug(f"Current block count: {count}, target: {start + target_blocks}")
Expand Down
14 changes: 14 additions & 0 deletions test/signet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from test_base import TestBase

from warnet.status import _get_active_scenarios as scenarios_active


class SignetTest(TestBase):
def __init__(self):
Expand All @@ -19,6 +21,7 @@ def run_test(self):
try:
self.setup_network()
self.check_signet_miner()
self.check_signet_recon()
finally:
self.cleanup()

Expand Down Expand Up @@ -46,6 +49,17 @@ def block_one():

self.wait_for_predicate(block_one)

def check_signet_recon(self):
scenario_file = "resources/scenarios/reconnaissance.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file}")

def check_scenario_clean_exit():
active = scenarios_active()
return all(scenario["status"] == "succeeded" for scenario in active)

self.wait_for_predicate(check_scenario_clean_exit)


if __name__ == "__main__":
test = SignetTest()
Expand Down

0 comments on commit 6ac8f9f

Please sign in to comment.