diff --git a/justfile b/justfile index e7e4ebb9b..29dd01d33 100644 --- a/justfile +++ b/justfile @@ -4,6 +4,12 @@ set shell := ["bash", "-uc"] default: just --list +cluster: + kubectl apply -f src/templates/rpc/namespace.yaml + kubectl apply -f src/templates/rpc/rbac-config.yaml + kubectl apply -f src/templates/rpc/warnet-rpc-service.yaml + kubectl apply -f src/templates/rpc/warnet-rpc-statefulset.yaml + # Setup and start the RPC in dev mode with minikube start: #!/usr/bin/env bash diff --git a/src/backends/backend_interface.py b/src/backends/backend_interface.py index 30b8bef72..6ee1d9bad 100644 --- a/src/backends/backend_interface.py +++ b/src/backends/backend_interface.py @@ -110,16 +110,15 @@ def get_tank_ipv4(self, index: int) -> str: raise NotImplementedError("This method should be overridden by child class") @abstractmethod - def wait_for_healthy_tanks(self, warnet, timeout=60) -> bool: + def get_lnnode_hostname(self, index: int) -> str: """ - Wait for healthy status on all bitcoind nodes + Get the hostname assigned to a lnnode attached to a tank from the backend """ raise NotImplementedError("This method should be overridden by child class") - @abstractmethod - def service_from_json(self, json: dict) -> dict: + def wait_for_healthy_tanks(self, warnet, timeout=60) -> bool: """ - Create a single container from a JSON object of settings + Wait for healthy status on all bitcoind nodes """ raise NotImplementedError("This method should be overridden by child class") diff --git a/src/backends/compose/compose_backend.py b/src/backends/compose/compose_backend.py index a995969db..a3bb6d98e 100644 --- a/src/backends/compose/compose_backend.py +++ b/src/backends/compose/compose_backend.py @@ -266,7 +266,7 @@ def _write_docker_compose(self, warnet): # Initialize services and add them to the compose for service_name in warnet.services: if "compose" in services[service_name]["backends"]: - compose["services"][service_name] = self.service_from_json(services[service_name]) + compose["services"][service_name] = self.service_from_json(service_name) docker_compose_path = warnet.config_dir / "docker-compose.yml" try: @@ -356,9 +356,12 @@ def add_services(self, tank: Tank, compose): "networks": [tank.network_name], } + def get_lnnode_hostname(self, index: int) -> str: + return self.get_container_name(index, ServiceType.LIGHTNING) + def add_lnd_service(self, tank, compose): services = compose["services"] - ln_container_name = self.get_container_name(tank.index, ServiceType.LIGHTNING) + ln_container_name = self.get_lnnode_hostname(tank.index) ln_cb_container_name = self.get_container_name(tank.index, ServiceType.CIRCUITBREAKER) bitcoin_container_name = self.get_container_name(tank.index, ServiceType.BITCOIN) # These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]` @@ -448,21 +451,37 @@ def wait_for_healthy_tanks(self, warnet, timeout=60) -> bool: return healthy - def service_from_json(self, obj: dict) -> dict: + def get_service_container_name(self, service_name: str): + return f"{self.network_name}_{services[service_name]['container_name_suffix']}" + + def service_from_json(self, service_name: str) -> object: + obj = services[service_name] volumes = obj.get("volumes", []) - volumes += [f"{self.config_dir}" + filepath for filepath in obj.get("config_files", [])] + for bind_mount in obj.get("config_files", []): + volume_name, mount_path = bind_mount.split(":") + hostpath = self.config_dir / volume_name + # If it's starting off as an empty directory, create it now so + # it has python-user permissions instead of docker-root + if volume_name[-1] == "/": + hostpath.mkdir(parents=True, exist_ok=True) + volumes += [f"{hostpath}:{mount_path}"] ports = [] if "container_port" and "warnet_port" in obj: ports = [f"{obj['warnet_port']}:{obj['container_port']}"] return { "image": obj["image"], - "container_name": f"{self.network_name}_{obj['container_name_suffix']}", + "container_name": self.get_service_container_name(service_name), "ports": ports, "volumes": volumes, "privileged": obj.get("privileged", False), "devices": obj.get("devices", []), "command": obj.get("args", []), "environment": obj.get("environment", []), + "restart": "on-failure", "networks": [self.network_name] } + + def restart_service_container(self, service_name: str): + container = self.client.containers.get(self.get_service_container_name(service_name)) + container.restart() diff --git a/src/backends/kubernetes/kubernetes_backend.py b/src/backends/kubernetes/kubernetes_backend.py index 8dda5837b..9778b302f 100644 --- a/src/backends/kubernetes/kubernetes_backend.py +++ b/src/backends/kubernetes/kubernetes_backend.py @@ -1,6 +1,7 @@ import base64 import logging import re +import subprocess import time from pathlib import Path from typing import cast @@ -15,6 +16,7 @@ from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.exceptions import ResourceNotFoundError from kubernetes.stream import stream +from warnet.services import services from warnet.status import RunningStatus from warnet.tank import Tank from warnet.utils import parse_raw_messages @@ -73,6 +75,10 @@ def down(self, warnet) -> bool: self.remove_prometheus_service_monitors(warnet.tanks) + for service_name in warnet.services: + if "k8s" in services[service_name]["backends"]: + self.client.delete_namespaced_pod(f'{services[service_name]["container_name_suffix"]}-service', self.namespace) + return True def get_file(self, tank_index: int, service: ServiceType, file_path: str): @@ -436,12 +442,15 @@ def remove_prometheus_service_monitors(self, tanks): except ResourceNotFoundError: continue + def get_lnnode_hostname(self, index: int) -> str: + return f"lightning-{index}.{self.namespace}" + def create_lnd_container( self, tank, bitcoind_service_name, volume_mounts ) -> client.V1Container: # These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]` bitcoind_rpc_host = f"{bitcoind_service_name}.{self.namespace}" - lightning_dns = f"lightning-{tank.index}.{self.namespace}" + lightning_dns = self.get_lnnode_hostname(tank.index) args = tank.lnnode.get_conf(lightning_dns, bitcoind_rpc_host) self.log.debug(f"Creating lightning container for tank {tank.index} using {args=:}") lightning_container = client.V1Container( @@ -668,6 +677,10 @@ def deploy_pods(self, warnet): if self.check_logging_crds_installed(): self.apply_prometheus_service_monitors(warnet.tanks) + for service_name in warnet.services: + if "k8s" in services[service_name]["backends"]: + self.service_from_json(services[service_name]) + self.log.debug("Containers and services created. Configuring IP addresses") # now that the pods have had a second to create, # get the ips and set them on the tanks @@ -700,5 +713,100 @@ def wait_for_healthy_tanks(self, warnet, timeout=30): """ pass - def service_from_json(self, obj: dict) -> dict: - pass + def service_from_json(self, obj): + env = [] + for pair in obj.get("environment", []): + name, value = pair.split("=") + env.append(client.V1EnvVar(name=name, value=value)) + volume_mounts = [] + volumes = [] + for vol in obj.get("config_files", []): + volume_name, mount_path = vol.split(":") + volume_name = volume_name.replace("/", "") + volume_mounts.append(client.V1VolumeMount(name=volume_name, mount_path=mount_path)) + volumes.append(client.V1Volume(name=volume_name, empty_dir=client.V1EmptyDirVolumeSource())) + + service_container = client.V1Container( + name=obj["container_name_suffix"], + image=obj["image"], + env=env, + security_context=client.V1SecurityContext( + privileged=True, + capabilities=client.V1Capabilities(add=["NET_ADMIN", "NET_RAW"]), + ), + volume_mounts=volume_mounts + ) + sidecar_container = client.V1Container( + name="sidecar", + image="pinheadmz/sidecar:latest", + volume_mounts=volume_mounts, + ports=[client.V1ContainerPort(container_port=22)], + ) + service_pod = client.V1Pod( + api_version="v1", + kind="Pod", + metadata=client.V1ObjectMeta( + name=obj["container_name_suffix"], + namespace=self.namespace, + labels={ + "app": obj["container_name_suffix"], + "network": self.network_name, + }, + ), + spec=client.V1PodSpec( + restart_policy="OnFailure", + containers=[service_container, sidecar_container], + volumes=volumes, + ), + ) + + # Do not ever change this variable name. xoxo, --Zip + service_service = client.V1Service( + api_version="v1", + kind="Service", + metadata=client.V1ObjectMeta( + name=f'{obj["container_name_suffix"]}-service', + labels={ + "app": obj["container_name_suffix"], + "network": self.network_name, + }, + ), + spec=client.V1ServiceSpec( + selector={"app": obj["container_name_suffix"]}, + publish_not_ready_addresses=True, + ports=[ + client.V1ServicePort(name="ssh", port=22, target_port=22), + ] + ) + ) + + self.client.create_namespaced_pod(namespace=self.namespace, body=service_pod) + self.client.create_namespaced_service(namespace=self.namespace, body=service_service) + + def write_service_config(self, source_path: str, service_name: str, destination_path: str): + obj = services[service_name] + name = obj["container_name_suffix"] + container_name = "sidecar" + # Copy the archive from our local drive (Warnet RPC container/pod) + # to the destination service's sidecar container via ssh + self.log.info(f"Copying local {source_path} to remote {destination_path} for {service_name}") + subprocess.run([ + "scp", + "-o", "StrictHostKeyChecking=accept-new", + source_path, + f"root@{name}-service.{self.namespace}:/arbitrary_filename.tar"]) + self.log.info(f"Finished copying tarball for {service_name}, unpacking...") + # Unpack the archive + stream( + self.client.connect_get_namespaced_pod_exec, + name, + self.namespace, + container=container_name, + command=["/bin/sh", "-c", f"tar -xf /arbitrary_filename.tar -C {destination_path}"], + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False + ) + self.log.info(f"Finished unpacking config data for {service_name} to {destination_path}") diff --git a/src/cli/network.py b/src/cli/network.py index b05880987..61538d9ce 100644 --- a/src/cli/network.py +++ b/src/cli/network.py @@ -143,8 +143,11 @@ def connected(network: str): @network.command() @click.option("--network", default="warnet", show_default=True) -def export(network): +@click.option("--activity", type=str) +def export(network: str, activity: str): """ - Export all [network] data for sim-ln to subdirectory + Export all [network] data for a "simln" service running in a container + on the network. Optionally add JSON string [activity] to simln config. + Returns True on success. """ - print(rpc_call("network_export", {"network": network})) + print(rpc_call("network_export", {"network": network, "activity": activity})) diff --git a/src/templates/Dockerfile_sidecar b/src/templates/Dockerfile_sidecar new file mode 100644 index 000000000..ec7f6a787 --- /dev/null +++ b/src/templates/Dockerfile_sidecar @@ -0,0 +1,12 @@ +FROM alpine:latest + +RUN apk add openssh + +RUN echo "root:" | chpasswd + +RUN ssh-keygen -A + +CMD ["/usr/sbin/sshd", "-D", \ + "-o", "PasswordAuthentication=yes", \ + "-o", "PermitEmptyPasswords=yes", \ + "-o", "PermitRootLogin=yes"] \ No newline at end of file diff --git a/src/templates/rpc/Dockerfile_rpc b/src/templates/rpc/Dockerfile_rpc index 51e881793..82da4a185 100644 --- a/src/templates/rpc/Dockerfile_rpc +++ b/src/templates/rpc/Dockerfile_rpc @@ -3,7 +3,7 @@ FROM python:3.11-slim # Install procps, which includes pgrep RUN apt-get update && \ - apt-get install -y procps && \ + apt-get install -y procps openssh-client && \ rm -rf /var/lib/apt/lists/* # Set the working directory in the container diff --git a/src/templates/rpc/Dockerfile_rpc_dev b/src/templates/rpc/Dockerfile_rpc_dev index 7a20891bc..f200a0727 100644 --- a/src/templates/rpc/Dockerfile_rpc_dev +++ b/src/templates/rpc/Dockerfile_rpc_dev @@ -3,7 +3,7 @@ FROM python:3.11-slim # Install procps, which includes pgrep RUN apt-get update && \ - apt-get install -y procps && \ + apt-get install -y procps openssh-client && \ rm -rf /var/lib/apt/lists/* # Set the working directory in the container diff --git a/src/warnet/lnnode.py b/src/warnet/lnnode.py index 8ccd29441..db93c72be 100644 --- a/src/warnet/lnnode.py +++ b/src/warnet/lnnode.py @@ -1,4 +1,5 @@ -import os +import io +import tarfile from backends import BackendInterface, ServiceType from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json @@ -13,7 +14,8 @@ "--bitcoin.active", "--bitcoin.regtest", "--bitcoin.node=bitcoind", - "--maxpendingchannels=64" + "--maxpendingchannels=64", + "--trickledelay=1" ]) class LNNode: @@ -114,30 +116,38 @@ def generate_cli_command(self, command: list[str]): raise Exception(f"Unsupported LN implementation: {self.impl}") return cmd - def export(self, config, subdir): - container_name = self.backend.get_container_name(self.tank.index, ServiceType.LIGHTNING) - macaroon_filename = f"{container_name}_admin.macaroon" - cert_filename = f"{container_name}_tls.cert" - macaroon_path = os.path.join(subdir, macaroon_filename) - cert_path = os.path.join(subdir, cert_filename) + def export(self, config: object, tar_file): + # Retrieve the credentials macaroon = self.backend.get_file( self.tank.index, ServiceType.LIGHTNING, "/root/.lnd/data/chain/bitcoin/regtest/admin.macaroon", ) - cert = self.backend.get_file(self.tank.index, ServiceType.LIGHTNING, "/root/.lnd/tls.cert") - - with open(macaroon_path, "wb") as f: - f.write(macaroon) - - with open(cert_path, "wb") as f: - f.write(cert) + cert = self.backend.get_file( + self.tank.index, + ServiceType.LIGHTNING, + "/root/.lnd/tls.cert" + ) + name = f"ln-{self.tank.index}" + macaroon_filename = f"{name}_admin.macaroon" + cert_filename = f"{name}_tls.cert" + host = self.backend.get_lnnode_hostname(self.tank.index) + + # Add the files to the in-memory tar archive + tarinfo1 = tarfile.TarInfo(name=macaroon_filename) + tarinfo1.size = len(macaroon) + fileobj1 = io.BytesIO(macaroon) + tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) + tarinfo2 = tarfile.TarInfo(name=cert_filename) + tarinfo2.size = len(cert) + fileobj2 = io.BytesIO(cert) + tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) config["nodes"].append( { - "id": container_name, - "address": f"https://{self.ipv4}:{self.rpc_port}", - "macaroon": macaroon_path, - "cert": cert_path, + "id": name, + "address": f"https://{host}:{self.rpc_port}", + "macaroon": f"/simln/{macaroon_filename}", + "cert": f"/simln/{cert_filename}", } ) diff --git a/src/warnet/server.py b/src/warnet/server.py index 90c7bc4b9..94dfb4089 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -1,5 +1,6 @@ import argparse import base64 +import io import json import logging import logging.config @@ -10,6 +11,7 @@ import signal import subprocess import sys +import tarfile import tempfile import threading import time @@ -268,20 +270,47 @@ def tank_messages(self, network: str, node_a: int, node_b: int) -> str: self.logger.error(msg) raise ServerError(message=msg) from e - def network_export(self, network: str) -> str: + def network_export(self, network: str, activity: str | None) -> bool: """ - Export all data for sim-ln to subdirectory + Export all data for a simln container running on the network """ - try: - wn = self.get_warnet(network) - subdir = os.path.join(wn.config_dir, "simln") - os.makedirs(subdir, exist_ok=True) - wn.export(subdir) - return subdir - except Exception as e: - msg = f"Error exporting network: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e + wn = self.get_warnet(network) + if "simln" not in wn.services: + raise Exception("No simln service in network") + + # JSON object that will eventually be written to simln config file + config = {"nodes": []} + if activity: + config["activity"] = json.loads(activity) + # In-memory file to build tar archive + tar_buffer = io.BytesIO() + with tarfile.open(fileobj=tar_buffer, mode="w") as tar_file: + # tank LN nodes add their credentials to tar archive + wn.export(config, tar_file) + # write config file + config_bytes = json.dumps(config).encode('utf-8') + config_stream = io.BytesIO(config_bytes) + tarinfo = tarfile.TarInfo(name="sim.json") + tarinfo.size = len(config_bytes) + tar_file.addfile(tarinfo=tarinfo, fileobj=config_stream) + + # Write the archive to the RPC server's config directory + source_file = wn.config_dir / "simln.tar" + with open(source_file, "wb") as output: + tar_buffer.seek(0) + output.write(tar_buffer.read()) + + if self.backend == "compose": + # Extract the archive into a subdirectory that is already + # shared with the simln container as a volume + subprocess.run(["tar", "-xf", source_file, "-C", wn.config_dir / 'simln']) + # Force quick restart of the container instead of waiting + # for the exponential backoff to come around + wn.container_interface.restart_service_container("simln") + if self.backend == "k8s": + # Copy the archive to the "emptydir" volume in the simln pod + wn.container_interface.write_service_config(source_file, "simln", "/simln/") + return True def scenarios_available(self) -> list[tuple]: """ diff --git a/src/warnet/services.py b/src/warnet/services.py index 570e48778..95009a6f9 100644 --- a/src/warnet/services.py +++ b/src/warnet/services.py @@ -31,7 +31,7 @@ "container_name_suffix": "fork-observer", "warnet_port": "23001", "container_port": "2323", - "config_files": [f"/{FO_CONF_NAME}:/app/config.toml"], + "config_files": [f"{FO_CONF_NAME}:/app/config.toml"], }, "grafana": { "backends": ["compose"], @@ -43,8 +43,8 @@ "grafana-storage:/var/lib/grafana" ], "config_files": [ - f"/{GRAFANA_PROVISIONING}/datasources:/etc/grafana/provisioning/datasources", - f"/{GRAFANA_PROVISIONING}/dashboards:/etc/grafana/provisioning/dashboards", + f"{GRAFANA_PROVISIONING}/datasources:/etc/grafana/provisioning/datasources", + f"{GRAFANA_PROVISIONING}/dashboards:/etc/grafana/provisioning/dashboards", ], "environment": [ "GF_LOG_LEVEL=debug", @@ -76,7 +76,17 @@ "container_name_suffix": "prometheus", "warnet_port": "23004", "container_port": "9090", - "config_files": [f"/{PROM_CONF_NAME}:/etc/prometheus/prometheus.yml"], + "config_files": [f"{PROM_CONF_NAME}:/etc/prometheus/prometheus.yml"], "args": ["--config.file=/etc/prometheus/prometheus.yml"] - } + }, + "simln": { + "backends": ["compose", "k8s"], + "image": "bitcoindevproject/simln:0.2.0", + "container_name_suffix": "simln", + "environment": [ + "LOG_LEVEL=debug", + "SIMFILE_PATH=/simln/sim.json" + ], + "config_files": ["simln/:/simln"] + }, } diff --git a/src/warnet/tank.py b/src/warnet/tank.py index 975444924..d3627bf73 100644 --- a/src/warnet/tank.py +++ b/src/warnet/tank.py @@ -169,6 +169,6 @@ def apply_network_conditions(self): f"Error applying network conditions to tank {self.index}: `{self.netem}` ({e})" ) - def export(self, config, subdir): + def export(self, config: object, tar_file): if self.lnnode is not None: - self.lnnode.export(config, subdir) + self.lnnode.export(config, tar_file) diff --git a/src/warnet/warnet.py b/src/warnet/warnet.py index db46186a6..e2cce88fd 100644 --- a/src/warnet/warnet.py +++ b/src/warnet/warnet.py @@ -5,7 +5,6 @@ import base64 import json import logging -import os import shutil from pathlib import Path @@ -136,6 +135,8 @@ def from_network(cls, network_name, backend="compose"): self.graph = networkx.read_graphml(Path(self.config_dir / self.graph_name), node_type=int, force_multigraph=True) validate_graph_schema(self.graph) self.tanks_from_graph() + if "services" in self.graph.graph: + self.services = self.graph.graph["services"].split() for tank in self.tanks: tank._ipv4 = self.container_interface.get_tank_ipv4(tank.index) return self @@ -237,15 +238,9 @@ def write_prometheus_config(self): except Exception as e: logger.error(f"An error occurred while writing to {prometheus_path}: {e}") - def export(self, subdir): - if self.backend != "compose": - raise NotImplementedError("Export is only supported for compose backend") - config = {"nodes": []} + def export(self, config: object, tar_file): for tank in self.tanks: - tank.export(config, subdir) - config_path = os.path.join(subdir, "sim.json") - with open(config_path, "a") as f: - json.dump(config, f) + tank.export(config, tar_file) def wait_for_health(self): self.container_interface.wait_for_healthy_tanks(self) diff --git a/test/data/ln.graphml b/test/data/ln.graphml index b923de8a5..331f64f92 100644 --- a/test/data/ln.graphml +++ b/test/data/ln.graphml @@ -1,5 +1,6 @@ + @@ -14,6 +15,7 @@ + simln 26.0 -uacomment=w0 diff --git a/test/data/services.graphml b/test/data/services.graphml index 6ed30c498..41d53bd80 100644 --- a/test/data/services.graphml +++ b/test/data/services.graphml @@ -20,6 +20,7 @@ 26.0 -uacomment=w0 -debug=validation true + lnd \ No newline at end of file diff --git a/test/ln_test.py b/test/ln_test.py index 60efbe867..9be69f129 100755 --- a/test/ln_test.py +++ b/test/ln_test.py @@ -25,20 +25,6 @@ def get_cb_forwards(index): base.wait_for_all_tanks_status(target="running") base.wait_for_all_edges() -if base.backend != "compose": - print("\nSkipping network export test, only supported with compose backend") -else: - print("\nTesting warcli network export") - path = Path(base.warcli("network export")) / "sim.json" - with open(path) as file: - data = json.load(file) - print(json.dumps(data, indent=4)) - assert len(data["nodes"]) == 3 - for node in data["nodes"]: - assert os.path.exists(node["macaroon"]) - assert os.path.exists(node["cert"]) - - print("\nRunning LN Init scenario") base.warcli("rpc 0 getblockcount") base.warcli("scenarios run ln_init") @@ -86,17 +72,27 @@ def check_invoices(): print(base.warcli(f"lncli 2 payinvoice -f {inv}")) print("Waiting for payment success") -def check_invoices(): - invs = json.loads(base.warcli("lncli 0 listinvoices"))["invoices"] - if len(invs) > 0 and invs[0]["state"] == "SETTLED": - print("\nSettled!") - return True - else: - return False -base.wait_for_predicate(check_invoices) +def check_invoices(index): + invs = json.loads(base.warcli(f"lncli {index} listinvoices"))["invoices"] + settled = 0 + for inv in invs: + if inv["state"] == "SETTLED": + settled += 1 + return settled +base.wait_for_predicate(lambda: check_invoices(0) == 1) print("\nEnsuring channel-level channel policy settings: target") payment = json.loads(base.warcli("lncli 2 listpayments"))["payments"][0] assert payment["fee_msat"] == "2213" +print("\nEngaging simln") +activity = [{ + "source": "ln-0", + "destination": chan["node1_pub"], + "interval_secs": 1, + "amount_msat": 2000 +}] +base.warcli(f"network export --activity={json.dumps(activity).replace(' ', '')}") +base.wait_for_predicate(lambda: check_invoices(0) > 1 or check_invoices(1) > 1 or check_invoices(2) > 1) + base.stop_server()