Skip to content

Commit

Permalink
Merge pull request #649 from willcl-ark/speedy-startup
Browse files Browse the repository at this point in the history
speedy startup
  • Loading branch information
pinheadmz authored Dec 6, 2024
2 parents 0287fdf + 09b7230 commit 82306a8
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 66 deletions.
6 changes: 6 additions & 0 deletions src/warnet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
},
}

LOGGING_CRD_COMMANDS = [
"helm repo add prometheus-community https://prometheus-community.github.io/helm-charts",
"helm repo update",
"helm upgrade --install prometheus-operator-crds prometheus-community/prometheus-operator-crds",
]

# Helm commands for logging setup
# TODO: also lots of hardcode stuff in these helm commands, will need to fix this when moving to helm charts
LOGGING_HELM_COMMANDS = [
Expand Down
17 changes: 13 additions & 4 deletions src/warnet/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import zipapp
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -112,10 +113,18 @@ def stop_scenario(scenario_name):


def stop_all_scenarios(scenarios):
"""Stop all active scenarios using Helm"""
with console.status("[bold yellow]Stopping all scenarios...[/bold yellow]"):
for scenario in scenarios:
stop_scenario(scenario)
"""Stop all active scenarios in parallel using multiprocessing"""

def stop_single(scenario):
stop_scenario(scenario)
return f"Stopped scenario: {scenario}"

with console.status("[bold yellow]Stopping all scenarios...[/bold yellow]"), Pool() as pool:
results = pool.map(stop_single, scenarios)

for result in results:
console.print(f"[bold green]{result}[/bold green]")

console.print("[bold green]All scenarios have been stopped.[/bold green]")


Expand Down
199 changes: 137 additions & 62 deletions src/warnet/deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subprocess
import sys
import tempfile
from multiprocessing import Process
from pathlib import Path
from typing import Optional

Expand All @@ -15,6 +16,7 @@
FORK_OBSERVER_CHART,
HELM_COMMAND,
INGRESS_HELM_COMMANDS,
LOGGING_CRD_COMMANDS,
LOGGING_HELM_COMMANDS,
LOGGING_NAMESPACE,
NAMESPACES_CHART_LOCATION,
Expand Down Expand Up @@ -75,17 +77,47 @@ def _deploy(directory, debug, namespace, to_all_users):

if to_all_users:
namespaces = get_namespaces_by_type(WARGAMES_NAMESPACE_PREFIX)
processes = []
for namespace in namespaces:
deploy(directory, debug, namespace.metadata.name, False)
p = Process(target=deploy, args=(directory, debug, namespace.metadata.name, False))
p.start()
processes.append(p)
for p in processes:
p.join()
return

if (directory / NETWORK_FILE).exists():
dl = deploy_logging_stack(directory, debug)
deploy_network(directory, debug, namespace=namespace)
df = deploy_fork_observer(directory, debug)
if dl | df:
deploy_ingress(debug)
deploy_caddy(directory, debug)
processes = []
# Deploy logging CRD first to avoid synchronisation issues
deploy_logging_crd(directory, debug)

logging_process = Process(target=deploy_logging_stack, args=(directory, debug))
logging_process.start()
processes.append(logging_process)

network_process = Process(target=deploy_network, args=(directory, debug, namespace))
network_process.start()

ingress_process = Process(target=deploy_ingress, args=(directory, debug))
ingress_process.start()
processes.append(ingress_process)

caddy_process = Process(target=deploy_caddy, args=(directory, debug))
caddy_process.start()
processes.append(caddy_process)

# Wait for the network process to complete
network_process.join()

# Start the fork observer process immediately after network process completes
fork_observer_process = Process(target=deploy_fork_observer, args=(directory, debug))
fork_observer_process.start()
processes.append(fork_observer_process)

# Wait for all other processes to complete
for p in processes:
p.join()

elif (directory / NAMESPACES_FILE).exists():
deploy_namespaces(directory)
else:
Expand Down Expand Up @@ -118,11 +150,30 @@ def check_logging_required(directory: Path):
return False


def deploy_logging_crd(directory: Path, debug: bool) -> bool:
"""
This function exists so we can parallelise the rest of the loggin stack
installation
"""
if not check_logging_required(directory):
return False

click.echo(
"Found collectLogs or metricsExport in network definition, Deploying logging stack CRD"
)

for command in LOGGING_CRD_COMMANDS:
if not stream_command(command):
print(f"Failed to run Helm command: {command}")
return False
return True


def deploy_logging_stack(directory: Path, debug: bool) -> bool:
if not check_logging_required(directory):
return False

click.echo("Found collectLogs or metricsExport in network definition, Deploying logging stack")
click.echo("Deploying logging stack")

for command in LOGGING_HELM_COMMANDS:
if not stream_command(command):
Expand All @@ -144,7 +195,7 @@ def deploy_caddy(directory: Path, debug: bool):
if not network_file.get(name, {}).get("enabled", False):
return

cmd = f"{HELM_COMMAND} {name} {CADDY_CHART} --namespace {namespace}"
cmd = f"{HELM_COMMAND} {name} {CADDY_CHART} --namespace {namespace} --create-namespace"
if debug:
cmd += " --debug"

Expand All @@ -156,7 +207,15 @@ def deploy_caddy(directory: Path, debug: bool):
click.echo("\nTo access the warnet dashboard run:\n warnet dashboard")


def deploy_ingress(debug: bool):
def deploy_ingress(directory: Path, debug: bool):
# Deploy ingress if either logging or fork observer is enabled
network_file_path = directory / NETWORK_FILE
with network_file_path.open() as f:
network_file = yaml.safe_load(f)
fo_enabled = network_file.get("fork_observer", {}).get("enabled", False)
logging_enabled = check_logging_required(directory)
if not (fo_enabled or logging_enabled):
return
click.echo("Deploying ingress controller")

for command in INGRESS_HELM_COMMANDS:
Expand Down Expand Up @@ -231,41 +290,49 @@ def deploy_fork_observer(directory: Path, debug: bool) -> bool:

def deploy_network(directory: Path, debug: bool = False, namespace: Optional[str] = None):
network_file_path = directory / NETWORK_FILE
defaults_file_path = directory / DEFAULTS_FILE

namespace = get_default_namespace_or(namespace)

with network_file_path.open() as f:
network_file = yaml.safe_load(f)

processes = []
for node in network_file["nodes"]:
click.echo(f"Deploying node: {node.get('name')}")
try:
temp_override_file_path = ""
node_name = node.get("name")
node_config_override = {k: v for k, v in node.items() if k != "name"}

cmd = f"{HELM_COMMAND} {node_name} {BITCOIN_CHART_LOCATION} --namespace {namespace} -f {defaults_file_path}"
if debug:
cmd += " --debug"

if node_config_override:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as temp_file:
yaml.dump(node_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
except Exception as e:
click.echo(f"Error: {e}")
p = Process(target=deploy_single_node, args=(node, directory, debug, namespace))
p.start()
processes.append(p)

for p in processes:
p.join()


def deploy_single_node(node, directory: Path, debug: bool, namespace: str):
defaults_file_path = directory / DEFAULTS_FILE
click.echo(f"Deploying node: {node.get('name')}")
temp_override_file_path = ""
try:
node_name = node.get("name")
node_config_override = {k: v for k, v in node.items() if k != "name"}

defaults_file_path = directory / DEFAULTS_FILE
cmd = f"{HELM_COMMAND} {node_name} {BITCOIN_CHART_LOCATION} --namespace {namespace} -f {defaults_file_path}"
if debug:
cmd += " --debug"

if node_config_override:
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_file:
yaml.dump(node_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()
except Exception as e:
click.echo(f"Error: {e}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()


def deploy_namespaces(directory: Path):
Expand All @@ -284,32 +351,40 @@ def deploy_namespaces(directory: Path):
)
return

processes = []
for namespace in namespaces_file["namespaces"]:
click.echo(f"Deploying namespace: {namespace.get('name')}")
try:
temp_override_file_path = ""
namespace_name = namespace.get("name")
namespace_config_override = {k: v for k, v in namespace.items() if k != "name"}

cmd = f"{HELM_COMMAND} {namespace_name} {NAMESPACES_CHART_LOCATION} -f {defaults_file_path}"

if namespace_config_override:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as temp_file:
yaml.dump(namespace_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
except Exception as e:
click.echo(f"Error: {e}")
p = Process(target=deploy_single_namespace, args=(namespace, defaults_file_path))
p.start()
processes.append(p)

for p in processes:
p.join()


def deploy_single_namespace(namespace, defaults_file_path: Path):
click.echo(f"Deploying namespace: {namespace.get('name')}")
temp_override_file_path = ""
try:
namespace_name = namespace.get("name")
namespace_config_override = {k: v for k, v in namespace.items() if k != "name"}

cmd = f"{HELM_COMMAND} {namespace_name} {NAMESPACES_CHART_LOCATION} -f {defaults_file_path}"

if namespace_config_override:
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_file:
yaml.dump(namespace_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
finally:
if temp_override_file_path:
temp_override_file_path.unlink()
except Exception as e:
click.echo(f"Error: {e}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()


def is_windows():
Expand Down

0 comments on commit 82306a8

Please sign in to comment.