From ead6ce9546b05f80ac340d67498d9c6b7cbb74c9 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 21 Feb 2024 15:01:06 -0500 Subject: [PATCH 1/7] k8s: yes, we do need to forward port 18444 in the bitcoind services --- src/backends/kubernetes/kubernetes_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backends/kubernetes/kubernetes_backend.py b/src/backends/kubernetes/kubernetes_backend.py index 2e8a7c148..7ed0bf704 100644 --- a/src/backends/kubernetes/kubernetes_backend.py +++ b/src/backends/kubernetes/kubernetes_backend.py @@ -532,7 +532,7 @@ def create_bitcoind_service(self, tank) -> client.V1Service: selector={"app": self.get_pod_name(tank.index, ServiceType.BITCOIN)}, publish_not_ready_addresses=True, ports=[ - # TODO: do we need to add 18444 here too? + client.V1ServicePort(port=18444, target_port=18444, name="p2p"), client.V1ServicePort(port=tank.rpc_port, target_port=tank.rpc_port, name="rpc"), client.V1ServicePort( port=tank.zmqblockport, target_port=tank.zmqblockport, name="zmqblock" From 190a448d7087ae83bb74050c3baec3caad0f7c58 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Wed, 21 Feb 2024 15:28:20 -0500 Subject: [PATCH 2/7] warnet: replace connect_edges() with bitcoind -addnode args --- src/backends/compose/compose_backend.py | 3 +++ src/backends/kubernetes/kubernetes_backend.py | 3 +++ src/warnet/server.py | 2 -- src/warnet/tank.py | 3 +++ src/warnet/warnet.py | 24 +++++++------------ 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/backends/compose/compose_backend.py b/src/backends/compose/compose_backend.py index 9021ec739..406731d9a 100644 --- a/src/backends/compose/compose_backend.py +++ b/src/backends/compose/compose_backend.py @@ -352,6 +352,9 @@ def default_config_args(self, tank): defaults += f" -rpcport={tank.rpc_port}" defaults += f" -zmqpubrawblock=tcp://0.0.0.0:{tank.zmqblockport}" defaults += f" -zmqpubrawtx=tcp://0.0.0.0:{tank.zmqtxport}" + # connect to initial peers as defined in graph file + for dst_index in tank.init_peers: + defaults += f" -addnode={self.get_container_name(dst_index, ServiceType.BITCOIN)}" return defaults def copy_configs(self, tank): diff --git a/src/backends/kubernetes/kubernetes_backend.py b/src/backends/kubernetes/kubernetes_backend.py index 7ed0bf704..d1136d150 100644 --- a/src/backends/kubernetes/kubernetes_backend.py +++ b/src/backends/kubernetes/kubernetes_backend.py @@ -306,6 +306,9 @@ def default_bitcoind_config_args(self, tank): defaults += f" -rpcport={tank.rpc_port}" defaults += f" -zmqpubrawblock=tcp://0.0.0.0:{tank.zmqblockport}" defaults += f" -zmqpubrawtx=tcp://0.0.0.0:{tank.zmqtxport}" + # connect to initial peers as defined in graph file + for dst_index in tank.init_peers: + defaults += f" -addnode={self.get_service_name(dst_index)}" return defaults def create_bitcoind_container(self, tank: Tank) -> client.V1Container: diff --git a/src/warnet/server.py b/src/warnet/server.py index b7b5fe493..602f11d0a 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -437,7 +437,6 @@ def thread_start(wn): wn = Warnet.from_network(network, self.backend) wn.apply_network_conditions() wn.wait_for_health() - wn.connect_edges() self.logger.info( f"Resumed warnet named '{network}' from config dir {wn.config_dir}" ) @@ -472,7 +471,6 @@ def thread_start(wn, lock: threading.Lock): wn.warnet_up() wn.wait_for_health() wn.apply_network_conditions() - wn.connect_edges() except Exception as e: trace = traceback.format_exc() self.logger.error(f"Unhandled exception starting warnet: {e}\n{trace}") diff --git a/src/warnet/tank.py b/src/warnet/tank.py index 7f8f4dbf1..f20a7d9c5 100644 --- a/src/warnet/tank.py +++ b/src/warnet/tank.py @@ -47,6 +47,9 @@ def __init__(self, index: int, config_dir: Path, warnet): self._suffix = None self._ipv4 = None self._exporter_name = None + # index of integers imported from graph file + # indicating which tanks to initially connect to + self.init_peers = [] def __str__(self) -> str: return f"Tank(index: {self.index}, version: {self.version}, conf: {self.bitcoin_config}, conf file: {self.conf_file}, netem: {self.netem}, IPv4: {self._ipv4})" diff --git a/src/warnet/warnet.py b/src/warnet/warnet.py index 5fe663788..7272fd1ac 100644 --- a/src/warnet/warnet.py +++ b/src/warnet/warnet.py @@ -161,27 +161,21 @@ def tanks_from_graph(self): raise Exception( f"Node ID in graph must be incrementing integers (got '{node_id}', expected '{len(self.tanks)}')" ) - self.tanks.append(Tank.from_graph_node(node_id, self)) + tank = Tank.from_graph_node(node_id, self) + # import edges as list of destinations to connect to + for edge in self.graph.edges(data=True): + (src, dst, data) = edge + if "channel" in data: + continue + if src == node_id: + tank.init_peers.append(int(dst)) + self.tanks.append(tank) logger.info(f"Imported {len(self.tanks)} tanks from graph") def apply_network_conditions(self): for tank in self.tanks: tank.apply_network_conditions() - def connect_edges(self): - if self.graph is None: - return - - for edge in self.graph.edges(data=True): - (src, dst, data) = edge - if "channel" in data: - continue - src_tank = self.tanks[src] - dst_ip = self.tanks[dst].ipv4 - cmd = f"bitcoin-cli -regtest -rpcuser={src_tank.rpc_user} -rpcpassword={src_tank.rpc_password} addnode {dst_ip}:18444 onetry" - logger.info(f"Using `{cmd}` to connect tanks {src} to {dst}") - src_tank.exec(cmd=cmd) - def warnet_build(self): self.container_interface.build() From 82382ddf25f09b40f99736cb6ee98bb06f24a22c Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 22 Feb 2024 12:33:39 -0500 Subject: [PATCH 3/7] rpc: add command "network connected" and wait for Truth in tests --- src/cli/network.py | 10 ++++++++++ src/warnet/server.py | 24 ++++++++++++++++++++++++ test/build_branch_test.py | 1 + test/graph_test.py | 1 + test/ln_test.py | 2 +- test/rpc_test.py | 1 + test/scenarios_test.py | 1 + test/test_base.py | 7 +++++++ test/v25_net_test.py | 1 + 9 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/cli/network.py b/src/cli/network.py index a2bad716a..df5e444a2 100644 --- a/src/cli/network.py +++ b/src/cli/network.py @@ -127,6 +127,16 @@ def status(network: str): print(f"Tank: {tank['tank_index']} \tBitcoin: {tank['bitcoin_status']}{lightning_status}") +@network.command() +@click.option("--network", default="warnet", show_default=True) +@click.argument("threshold", type=int) +def connected(network: str): + """ + Indicate whether the all of the edges in the gaph file are connected in + """ + print(rpc_call("network_connected", {"network": network})) + + @network.command() @click.option("--network", default="warnet", show_default=True) def export(network): diff --git a/src/warnet/server.py b/src/warnet/server.py index 602f11d0a..dec195213 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -14,6 +14,7 @@ import traceback from datetime import datetime from io import BytesIO +from json import loads from logging import StreamHandler from logging.handlers import RotatingFileHandler from pathlib import Path @@ -174,6 +175,7 @@ def setup_rpc(self): self.jsonrpc.register(self.network_down) self.jsonrpc.register(self.network_info) self.jsonrpc.register(self.network_status) + self.jsonrpc.register(self.network_connected) self.jsonrpc.register(self.network_export) # Graph self.jsonrpc.register(self.graph_generate) @@ -568,6 +570,28 @@ def network_status(self, network: str = "warnet") -> list[dict]: self.logger.error(msg) raise ServerError(message=msg) from e + + def network_connected(self, network: str = "warnet") -> bool: + """ + Indicate whether the of graph edges is connected in + """ + try: + wn = Warnet.from_network(network, self.backend) + for tank in wn.tanks: + peerinfo = loads(wn.container_interface.get_bitcoin_cli(tank, "getpeerinfo")) + manuals = 0 + for peer in peerinfo: + if peer["connection_type"] == "manual": + manuals += 1 + # Even if more edges are specifed, bitcoind only allows + # 8 manual outbound connections + if min(8, len(tank.init_peers)) > manuals: + return False + return True + except Exception as e: + self.logger.error(f"{e}") + return False + def generate_deployment(self, graph_file: str, network: str = "warnet") -> str: """ Generate the deployment file for a graph file diff --git a/test/build_branch_test.py b/test/build_branch_test.py index 1bee58a34..41895c380 100755 --- a/test/build_branch_test.py +++ b/test/build_branch_test.py @@ -12,6 +12,7 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running", timeout=10*60) +base.wait_for_all_edges() print("\nWait for p2p connections") diff --git a/test/graph_test.py b/test/graph_test.py index 6bd9a19dd..1d2af1212 100755 --- a/test/graph_test.py +++ b/test/graph_test.py @@ -30,6 +30,7 @@ # Test that the graph actually works print(base.warcli(f"network start {Path(tf)}")) base.wait_for_all_tanks_status(target="running") + base.wait_for_all_edges() base.warcli("rpc 0 getblockcount") base.stop_server() diff --git a/test/ln_test.py b/test/ln_test.py index 8c2b6eaf9..dec999b9c 100755 --- a/test/ln_test.py +++ b/test/ln_test.py @@ -12,7 +12,7 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running") - +base.wait_for_all_edges() if base.backend != "compose": print("\nSkipping network export test, only supported with compose backend") diff --git a/test/rpc_test.py b/test/rpc_test.py index 2a0a08490..fed561759 100755 --- a/test/rpc_test.py +++ b/test/rpc_test.py @@ -11,6 +11,7 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running") +base.wait_for_all_edges() # Exponential backoff will repeat this command until it succeeds. # That's when we are ready for commands diff --git a/test/scenarios_test.py b/test/scenarios_test.py index bb5d06ad9..836a81862 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -11,6 +11,7 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running") +base.wait_for_all_edges() # Use rpc instead of warcli so we get raw JSON object scenarios = base.rpc("scenarios_available") diff --git a/test/test_base.py b/test/test_base.py index 5b58105cb..38d169554 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -181,6 +181,13 @@ def check_status(): self.wait_for_predicate(check_status, timeout, interval) + # Ensure all tanks have all the connections they are supposed to have + # Block until all success + def wait_for_all_edges(self, timeout=20 * 60, interval=5): + def check_status(): + return self.wait_for_rpc("network_connected", {"network": self.network_name}) + self.wait_for_predicate(check_status, timeout, interval) + def wait_for_all_scenarios(self): def check_scenarios(): scns = self.rpc("scenarios_list_running") diff --git a/test/v25_net_test.py b/test/v25_net_test.py index 65d101bf9..3cb3a21fb 100755 --- a/test/v25_net_test.py +++ b/test/v25_net_test.py @@ -12,6 +12,7 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running") +base.wait_for_all_edges() onion_addr = None From d0af101f1417684a4fcf097ea0040fe18fded150 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 22 Feb 2024 14:02:52 -0500 Subject: [PATCH 4/7] k8s shutdown: catch error from dynamic client instead of static client --- src/backends/kubernetes/kubernetes_backend.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backends/kubernetes/kubernetes_backend.py b/src/backends/kubernetes/kubernetes_backend.py index d1136d150..ca4af563c 100644 --- a/src/backends/kubernetes/kubernetes_backend.py +++ b/src/backends/kubernetes/kubernetes_backend.py @@ -12,6 +12,7 @@ from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.dynamic import DynamicClient +from kubernetes.dynamic.exceptions import ResourceNotFoundError from kubernetes.stream import stream from warnet.status import RunningStatus from warnet.tank import Tank @@ -431,9 +432,8 @@ def remove_prometheus_service_monitors(self, tanks): name=f"warnet-tank-{tank.index:06d}", namespace=MAIN_NAMESPACE, ) - except ApiException as e: - if e.status != 404: - raise e + except ResourceNotFoundError: + continue def create_lnd_container(self, tank, bitcoind_service_name) -> client.V1Container: # These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]` From 99406b57b5e4b32604418201bbfaf8ad63434378 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 22 Feb 2024 14:06:25 -0500 Subject: [PATCH 5/7] k8s: get p2p messages by pod IP or service IP --- src/backends/kubernetes/kubernetes_backend.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/backends/kubernetes/kubernetes_backend.py b/src/backends/kubernetes/kubernetes_backend.py index ca4af563c..dddc89b47 100644 --- a/src/backends/kubernetes/kubernetes_backend.py +++ b/src/backends/kubernetes/kubernetes_backend.py @@ -10,6 +10,7 @@ from cli.image import build_image from kubernetes import client, config from kubernetes.client.models.v1_pod import V1Pod +from kubernetes.client.models.v1_service import V1Service from kubernetes.client.rest import ApiException from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.exceptions import ResourceNotFoundError @@ -121,6 +122,15 @@ def get_pod(self, pod_name: str) -> V1Pod | None: if e.status == 404: return None + def get_service(self, service_name: str) -> V1Service | None: + try: + return cast( + V1Service, self.client.read_namespaced_service(name=service_name, namespace=self.namespace) + ) + except ApiException as e: + if e.status == 404: + return None + # We could enhance this by checking the pod status as well # The following pod phases are available: Pending, Running, Succeeded, Failed, Unknown # For example not able to pull image will be a phase of Pending, but the container status will be ErrImagePull @@ -237,6 +247,7 @@ def get_messages( bitcoin_network: str = "regtest", ): b_pod = self.get_pod(self.get_pod_name(b_index, ServiceType.BITCOIN)) + b_service = self.get_service(self.get_service_name(b_index)) subdir = "/" if bitcoin_network == "main" else f"{bitcoin_network}/" base_dir = f"/root/.bitcoin/{subdir}message_capture" cmd = f"ls {base_dir}" @@ -251,7 +262,7 @@ def get_messages( messages = [] for dir_name in dirs: - if b_pod.status.pod_ip in dir_name: + if b_pod.status.pod_ip in dir_name or b_service.spec.cluster_ip in dir_name: for file, outbound in [["msgs_recv.dat", False], ["msgs_sent.dat", True]]: # Fetch the file contents from the container file_path = f"{base_dir}/{dir_name}/{file}" From 229bd385629c02ef433ce90742473670e0a274ae Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 23 Feb 2024 11:13:19 -0500 Subject: [PATCH 6/7] util: prevent reconnected two nodes in graph_generate --- src/warnet/utils.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/warnet/utils.py b/src/warnet/utils.py index 002f12f3b..cf6d48f1e 100644 --- a/src/warnet/utils.py +++ b/src/warnet/utils.py @@ -410,17 +410,17 @@ def create_cycle_graph( # Graph is a simply cycle graph with all nodes connected in a loop, including both ends. # Ensure each node has at least 8 outbound connections by making 7 more outbound connections - for node in graph.nodes(): - logger.debug(f"Creating additional connections for node {node}") + for src_node in graph.nodes(): + logger.debug(f"Creating additional connections for node {src_node}") for _ in range(8): # Choose a random node to connect to - # Make sure it's not the same node and they aren't already connected - potential_nodes = [ node for node in range(n) if n != node and not graph.has_edge(node, n) ] + # Make sure it's not the same node and they aren't already connected in either direction + potential_nodes = [ dst_node for dst_node in range(n) if dst_node != src_node and not graph.has_edge(dst_node, src_node) and not graph.has_edge(src_node, dst_node) ] if potential_nodes: chosen_node = random.choice(potential_nodes) - graph.add_edge(node, chosen_node) - logger.debug(f"Added edge: {node}:{chosen_node}") - logger.debug(f"Node {node} edges: {graph.edges(node)}") + graph.add_edge(src_node, chosen_node) + logger.debug(f"Added edge: {src_node}:{chosen_node}") + logger.debug(f"Node {src_node} edges: {graph.edges(src_node)}") # calculate degree degree_dict = dict(graph.degree(graph.nodes())) From b6b9de05d876ec70d4e3e160bbee1152e1015a60 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Thu, 29 Feb 2024 14:04:36 -0500 Subject: [PATCH 7/7] server: move network_connected to Warnet class for scenario access --- src/cli/network.py | 1 - src/scenarios/miner_std.py | 3 +++ src/warnet/server.py | 16 ++-------------- src/warnet/warnet.py | 13 +++++++++++++ test/scenarios_test.py | 5 ----- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/cli/network.py b/src/cli/network.py index df5e444a2..417072526 100644 --- a/src/cli/network.py +++ b/src/cli/network.py @@ -129,7 +129,6 @@ def status(network: str): @network.command() @click.option("--network", default="warnet", show_default=True) -@click.argument("threshold", type=int) def connected(network: str): """ Indicate whether the all of the edges in the gaph file are connected in diff --git a/src/scenarios/miner_std.py b/src/scenarios/miner_std.py index 35d7cc29e..5b17e1f1a 100755 --- a/src/scenarios/miner_std.py +++ b/src/scenarios/miner_std.py @@ -31,6 +31,9 @@ def add_options(self, parser): ) def run_test(self): + while not self.warnet.network_connected(): + sleep(1) + current_miner = 0 while True: diff --git a/src/warnet/server.py b/src/warnet/server.py index dec195213..eea9c440a 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -14,7 +14,6 @@ import traceback from datetime import datetime from io import BytesIO -from json import loads from logging import StreamHandler from logging.handlers import RotatingFileHandler from pathlib import Path @@ -570,24 +569,13 @@ def network_status(self, network: str = "warnet") -> list[dict]: self.logger.error(msg) raise ServerError(message=msg) from e - def network_connected(self, network: str = "warnet") -> bool: """ - Indicate whether the of graph edges is connected in + Indicate whether all of the graph edges are connected in """ try: wn = Warnet.from_network(network, self.backend) - for tank in wn.tanks: - peerinfo = loads(wn.container_interface.get_bitcoin_cli(tank, "getpeerinfo")) - manuals = 0 - for peer in peerinfo: - if peer["connection_type"] == "manual": - manuals += 1 - # Even if more edges are specifed, bitcoind only allows - # 8 manual outbound connections - if min(8, len(tank.init_peers)) > manuals: - return False - return True + return wn.network_connected() except Exception as e: self.logger.error(f"{e}") return False diff --git a/src/warnet/warnet.py b/src/warnet/warnet.py index 7272fd1ac..55f2a51c9 100644 --- a/src/warnet/warnet.py +++ b/src/warnet/warnet.py @@ -221,3 +221,16 @@ def export(self, subdir): def wait_for_health(self): self.container_interface.wait_for_healthy_tanks(self) + + def network_connected(self): + for tank in self.tanks: + peerinfo = json.loads(self.container_interface.get_bitcoin_cli(tank, "getpeerinfo")) + manuals = 0 + for peer in peerinfo: + if peer["connection_type"] == "manual": + manuals += 1 + # Even if more edges are specifed, bitcoind only allows + # 8 manual outbound connections + if min(8, len(tank.init_peers)) > manuals: + return False + return True diff --git a/test/scenarios_test.py b/test/scenarios_test.py index 836a81862..cbb4a1964 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -11,16 +11,11 @@ base.start_server() print(base.warcli(f"network start {graph_file_path}")) base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() # Use rpc instead of warcli so we get raw JSON object scenarios = base.rpc("scenarios_available") assert len(scenarios) == 4 -# Exponential backoff will repeat this command until it succeeds. -# That's when we are ready for scenarios -base.warcli("rpc 0 getblockcount") - # Start scenario base.warcli("scenarios run miner_std --allnodes --interval=1")