Skip to content

Commit

Permalink
Merge pull request #285 from pinheadmz/addnode-arg
Browse files Browse the repository at this point in the history
  • Loading branch information
willcl-ark authored Mar 4, 2024
2 parents 892553a + b6b9de0 commit 4b03d49
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 34 deletions.
3 changes: 3 additions & 0 deletions src/backends/compose/compose_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ def default_config_args(self, tank):
defaults += f" -rpcport={tank.rpc_port}"
defaults += f" -zmqpubrawblock=tcp://0.0.0.0:{tank.zmqblockport}"
defaults += f" -zmqpubrawtx=tcp://0.0.0.0:{tank.zmqtxport}"
# connect to initial peers as defined in graph file
for dst_index in tank.init_peers:
defaults += f" -addnode={self.get_container_name(dst_index, ServiceType.BITCOIN)}"
return defaults

def copy_configs(self, tank):
Expand Down
24 changes: 19 additions & 5 deletions src/backends/kubernetes/kubernetes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from cli.image import build_image
from kubernetes import client, config
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_service import V1Service
from kubernetes.client.rest import ApiException
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import ResourceNotFoundError
from kubernetes.stream import stream
from warnet.status import RunningStatus
from warnet.tank import Tank
Expand Down Expand Up @@ -122,6 +124,15 @@ def get_pod(self, pod_name: str) -> V1Pod | None:
if e.status == 404:
return None

def get_service(self, service_name: str) -> V1Service | None:
try:
return cast(
V1Service, self.client.read_namespaced_service(name=service_name, namespace=self.namespace)
)
except ApiException as e:
if e.status == 404:
return None

# We could enhance this by checking the pod status as well
# The following pod phases are available: Pending, Running, Succeeded, Failed, Unknown
# For example not able to pull image will be a phase of Pending, but the container status will be ErrImagePull
Expand Down Expand Up @@ -239,6 +250,7 @@ def get_messages(
bitcoin_network: str = "regtest",
):
b_pod = self.get_pod(self.get_pod_name(b_index, ServiceType.BITCOIN))
b_service = self.get_service(self.get_service_name(b_index))
subdir = "/" if bitcoin_network == "main" else f"{bitcoin_network}/"
base_dir = f"/root/.bitcoin/{subdir}message_capture"
cmd = f"ls {base_dir}"
Expand All @@ -253,7 +265,7 @@ def get_messages(
messages = []

for dir_name in dirs:
if b_pod.status.pod_ip in dir_name:
if b_pod.status.pod_ip in dir_name or b_service.spec.cluster_ip in dir_name:
for file, outbound in [["msgs_recv.dat", False], ["msgs_sent.dat", True]]:
# Fetch the file contents from the container
file_path = f"{base_dir}/{dir_name}/{file}"
Expand Down Expand Up @@ -309,6 +321,9 @@ def default_bitcoind_config_args(self, tank):
defaults += f" -rpcport={tank.rpc_port}"
defaults += f" -zmqpubrawblock=tcp://0.0.0.0:{tank.zmqblockport}"
defaults += f" -zmqpubrawtx=tcp://0.0.0.0:{tank.zmqtxport}"
# connect to initial peers as defined in graph file
for dst_index in tank.init_peers:
defaults += f" -addnode={self.get_service_name(dst_index)}"
return defaults

def create_bitcoind_container(self, tank: Tank) -> client.V1Container:
Expand Down Expand Up @@ -431,9 +446,8 @@ def remove_prometheus_service_monitors(self, tanks):
name=f"warnet-tank-{tank.index:06d}",
namespace=MAIN_NAMESPACE,
)
except ApiException as e:
if e.status != 404:
raise e
except ResourceNotFoundError:
continue

def create_lnd_container(self, tank, bitcoind_service_name, volume_mounts) -> client.V1Container:
# These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]`
Expand Down Expand Up @@ -557,7 +571,7 @@ def create_bitcoind_service(self, tank) -> client.V1Service:
selector={"app": self.get_pod_name(tank.index, ServiceType.BITCOIN)},
publish_not_ready_addresses=True,
ports=[
# TODO: do we need to add 18444 here too?
client.V1ServicePort(port=18444, target_port=18444, name="p2p"),
client.V1ServicePort(port=tank.rpc_port, target_port=tank.rpc_port, name="rpc"),
client.V1ServicePort(
port=tank.zmqblockport, target_port=tank.zmqblockport, name="zmqblock"
Expand Down
9 changes: 9 additions & 0 deletions src/cli/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ def status(network: str):
print(f"Tank: {tank['tank_index']} \tBitcoin: {tank['bitcoin_status']}{lightning_status}{circuitbreaker_status}")


@network.command()
@click.option("--network", default="warnet", show_default=True)
def connected(network: str):
"""
Indicate whether the all of the edges in the gaph file are connected in <network>
"""
print(rpc_call("network_connected", {"network": network}))


@network.command()
@click.option("--network", default="warnet", show_default=True)
def export(network):
Expand Down
3 changes: 3 additions & 0 deletions src/scenarios/miner_std.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def add_options(self, parser):
)

def run_test(self):
while not self.warnet.network_connected():
sleep(1)

current_miner = 0

while True:
Expand Down
14 changes: 12 additions & 2 deletions src/warnet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def setup_rpc(self):
self.jsonrpc.register(self.network_down)
self.jsonrpc.register(self.network_info)
self.jsonrpc.register(self.network_status)
self.jsonrpc.register(self.network_connected)
self.jsonrpc.register(self.network_export)
# Graph
self.jsonrpc.register(self.graph_generate)
Expand Down Expand Up @@ -439,7 +440,6 @@ def thread_start(wn):
wn = Warnet.from_network(network, self.backend)
wn.apply_network_conditions()
wn.wait_for_health()
wn.connect_edges()
self.logger.info(
f"Resumed warnet named '{network}' from config dir {wn.config_dir}"
)
Expand Down Expand Up @@ -474,7 +474,6 @@ def thread_start(wn, lock: threading.Lock):
wn.warnet_up()
wn.wait_for_health()
wn.apply_network_conditions()
wn.connect_edges()
except Exception as e:
trace = traceback.format_exc()
self.logger.error(f"Unhandled exception starting warnet: {e}\n{trace}")
Expand Down Expand Up @@ -574,6 +573,17 @@ def network_status(self, network: str = "warnet") -> list[dict]:
self.logger.error(msg)
raise ServerError(message=msg) from e

def network_connected(self, network: str = "warnet") -> bool:
"""
Indicate whether all of the graph edges are connected in <network>
"""
try:
wn = Warnet.from_network(network, self.backend)
return wn.network_connected()
except Exception as e:
self.logger.error(f"{e}")
return False

def generate_deployment(self, graph_file: str, network: str = "warnet") -> str:
"""
Generate the deployment file for a graph file
Expand Down
3 changes: 3 additions & 0 deletions src/warnet/tank.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__(self, index: int, config_dir: Path, warnet):
self._suffix = None
self._ipv4 = None
self._exporter_name = None
# index of integers imported from graph file
# indicating which tanks to initially connect to
self.init_peers = []

def __str__(self) -> str:
return f"Tank(index: {self.index}, version: {self.version}, conf: {self.bitcoin_config}, conf file: {self.conf_file}, netem: {self.netem}, IPv4: {self._ipv4})"
Expand Down
14 changes: 7 additions & 7 deletions src/warnet/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,17 +410,17 @@ def create_cycle_graph(

# Graph is a simply cycle graph with all nodes connected in a loop, including both ends.
# Ensure each node has at least 8 outbound connections by making 7 more outbound connections
for node in graph.nodes():
logger.debug(f"Creating additional connections for node {node}")
for src_node in graph.nodes():
logger.debug(f"Creating additional connections for node {src_node}")
for _ in range(8):
# Choose a random node to connect to
# Make sure it's not the same node and they aren't already connected
potential_nodes = [ node for node in range(n) if n != node and not graph.has_edge(node, n) ]
# Make sure it's not the same node and they aren't already connected in either direction
potential_nodes = [ dst_node for dst_node in range(n) if dst_node != src_node and not graph.has_edge(dst_node, src_node) and not graph.has_edge(src_node, dst_node) ]
if potential_nodes:
chosen_node = random.choice(potential_nodes)
graph.add_edge(node, chosen_node)
logger.debug(f"Added edge: {node}:{chosen_node}")
logger.debug(f"Node {node} edges: {graph.edges(node)}")
graph.add_edge(src_node, chosen_node)
logger.debug(f"Added edge: {src_node}:{chosen_node}")
logger.debug(f"Node {src_node} edges: {graph.edges(src_node)}")

# calculate degree
degree_dict = dict(graph.degree(graph.nodes()))
Expand Down
37 changes: 22 additions & 15 deletions src/warnet/warnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,27 +161,21 @@ def tanks_from_graph(self):
raise Exception(
f"Node ID in graph must be incrementing integers (got '{node_id}', expected '{len(self.tanks)}')"
)
self.tanks.append(Tank.from_graph_node(node_id, self))
tank = Tank.from_graph_node(node_id, self)
# import edges as list of destinations to connect to
for edge in self.graph.edges(data=True):
(src, dst, data) = edge
if "channel" in data:
continue
if src == node_id:
tank.init_peers.append(int(dst))
self.tanks.append(tank)
logger.info(f"Imported {len(self.tanks)} tanks from graph")

def apply_network_conditions(self):
for tank in self.tanks:
tank.apply_network_conditions()

def connect_edges(self):
if self.graph is None:
return

for edge in self.graph.edges(data=True):
(src, dst, data) = edge
if "channel" in data:
continue
src_tank = self.tanks[src]
dst_ip = self.tanks[dst].ipv4
cmd = f"bitcoin-cli -regtest -rpcuser={src_tank.rpc_user} -rpcpassword={src_tank.rpc_password} addnode {dst_ip}:18444 onetry"
logger.info(f"Using `{cmd}` to connect tanks {src} to {dst}")
src_tank.exec(cmd=cmd)

def warnet_build(self):
self.container_interface.build()

Expand Down Expand Up @@ -227,3 +221,16 @@ def export(self, subdir):

def wait_for_health(self):
self.container_interface.wait_for_healthy_tanks(self)

def network_connected(self):
for tank in self.tanks:
peerinfo = json.loads(self.container_interface.get_bitcoin_cli(tank, "getpeerinfo"))
manuals = 0
for peer in peerinfo:
if peer["connection_type"] == "manual":
manuals += 1
# Even if more edges are specifed, bitcoind only allows
# 8 manual outbound connections
if min(8, len(tank.init_peers)) > manuals:
return False
return True
1 change: 1 addition & 0 deletions test/build_branch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
base.start_server()
print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running", timeout=10*60)
base.wait_for_all_edges()

print("\nWait for p2p connections")

Expand Down
1 change: 1 addition & 0 deletions test/graph_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# Test that the graph actually works
print(base.warcli(f"network start {Path(tf)}"))
base.wait_for_all_tanks_status(target="running")
base.wait_for_all_edges()
base.warcli("rpc 0 getblockcount")

base.stop_server()
2 changes: 1 addition & 1 deletion test/ln_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_cb_forwards(index):

print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running")

base.wait_for_all_edges()

if base.backend != "compose":
print("\nSkipping network export test, only supported with compose backend")
Expand Down
1 change: 1 addition & 0 deletions test/rpc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
base.start_server()
print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running")
base.wait_for_all_edges()

# Exponential backoff will repeat this command until it succeeds.
# That's when we are ready for commands
Expand Down
4 changes: 0 additions & 4 deletions test/scenarios_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
scenarios = base.rpc("scenarios_available")
assert len(scenarios) == 4

# Exponential backoff will repeat this command until it succeeds.
# That's when we are ready for scenarios
base.warcli("rpc 0 getblockcount")

# Start scenario
base.warcli("scenarios run miner_std --allnodes --interval=1")

Expand Down
7 changes: 7 additions & 0 deletions test/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ def check_status():

self.wait_for_predicate(check_status, timeout, interval)

# Ensure all tanks have all the connections they are supposed to have
# Block until all success
def wait_for_all_edges(self, timeout=20 * 60, interval=5):
def check_status():
return self.wait_for_rpc("network_connected", {"network": self.network_name})
self.wait_for_predicate(check_status, timeout, interval)

def wait_for_all_scenarios(self):
def check_scenarios():
scns = self.rpc("scenarios_list_running")
Expand Down
1 change: 1 addition & 0 deletions test/v25_net_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
base.start_server()
print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running")
base.wait_for_all_edges()

onion_addr = None

Expand Down

0 comments on commit 4b03d49

Please sign in to comment.