From 3dd63d220cea875d77a9ddc995abd9b0c0e5546b Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Tue, 28 Jan 2025 23:50:26 +0100 Subject: [PATCH 1/8] update to sky 0.7.0 pymcs start stop exec(unfinished) auto stop/down & idle_timeout from yaml --- requirements.txt | 4 +- src/pymc_server/cli.py | 84 ++--- src/pymc_server/cli_factory.py | 141 ++++++++ src/pymc_server/commands/down_cli.py | 44 ++- src/pymc_server/commands/exec_cli.py | 135 ++++++++ src/pymc_server/commands/launch_cli.py | 457 ++----------------------- src/pymc_server/commands/status_cli.py | 323 +++++++++++++++++ src/pymc_server/utils/launch.py | 233 +++++++++++++ src/pymc_server/utils/names.py | 5 +- src/pymc_server/utils/yaml.py | 149 +++++++- 10 files changed, 1108 insertions(+), 467 deletions(-) create mode 100644 src/pymc_server/commands/exec_cli.py create mode 100644 src/pymc_server/commands/status_cli.py create mode 100644 src/pymc_server/utils/launch.py diff --git a/requirements.txt b/requirements.txt index 4fad1c4..5947f17 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,6 @@ nutpie lifetimes numba ray[default] -skypilot -skypilot[gcp] +skypilot==0.7.0 +skypilot[gcp]==0.7.0 hiyapyco diff --git a/src/pymc_server/cli.py b/src/pymc_server/cli.py index 11749df..62d3318 100644 --- a/src/pymc_server/cli.py +++ b/src/pymc_server/cli.py @@ -3,18 +3,21 @@ from typing import Any, Dict, List, Optional, Tuple, Union from pymc_server.utils.yaml import merge_yaml #from pymc_server.commands.launch_cli import launch as cli_launch,cli_launch_ -from pymc_server.commands.launch_cli import launch_2 -from pymc_server.commands.down_cli import down as down_cli -from pymc_server.cli_factory import setup_launch_factory, setup_status_factory +from pymc_server.commands.down_cli import (down as down_cli, setup_down_factory as setup_down_factory) +from pymc_server.commands.launch_cli import launch as launch_cli +from pymc_server.commands.exec_cli import exec as exec_cli +from pymc_server.commands.status_cli import status as status_cli + +from pymc_server.cli_factory import setup_launch_factory, setup_status_factory, setup_exec_factory, setup_start_factory,setup_stop_factory from sky.usage import usage_lib from sky.cli import _get_shell_complete_args, _get_click_major_version, _complete_cluster_name, _NaturalOrderGroup, _DocumentedCodeCommand - - from sky.cli import ( status as sky_status, launch as sky_launch, - check as sky_check + check as sky_check, + start as sky_start, + stop as sky_stop ) # TODO: remove, check pyproject.py for a reference to this function @@ -28,13 +31,23 @@ def status(*args, **kwargs): """ calls the sky status command by passing the click context""" ctx = click.get_current_context() #ctx.invoke(_status_test, *args, **kwargs) - print("*args",*args) - data = ctx.invoke(sky_status, *args, **kwargs) + #print("*args",*args) + data = ctx.invoke(status_cli, *args, **kwargs) #print("DATA",data) #ctx.invoke(sky_status, *args, **kwargs) +@setup_launch_factory +@usage_lib.entrypoint +def exec(*args, **kwargs): + """ calls the sky status command by passing the click context""" + ctx = click.get_current_context() + #sky_check(*args, **kwargs) + ctx.invoke(exec_cli, *args, **kwargs) + #sads + + @setup_launch_factory @usage_lib.entrypoint def launch(*args, **kwargs): @@ -48,8 +61,7 @@ def launch(*args, **kwargs): """ # cli_launch(*args, **kwargs) ctx = click.get_current_context() - ctx.invoke(launch_2, *args, **kwargs) - #ctx.invoke(sky_launch, *args, **kwargs) + ctx.invoke(launch_cli, *args, **kwargs) @setup_status_factory @usage_lib.entrypoint @@ -60,34 +72,25 @@ def check(*args, **kwargs): ctx.invoke(sky_check, *args, **kwargs) -@cli.command(cls=_DocumentedCodeCommand) -@click.argument('clusters', - nargs=-1, - required=False, - **_get_shell_complete_args(_complete_cluster_name)) -@click.option('--all', - '-a', - default=None, - is_flag=True, - help='Tear down all existing clusters.') -@click.option('--yes', - '-y', - is_flag=True, - default=False, - required=False, - help='Skip confirmation prompt.') -@click.option( - '--purge', - '-p', - is_flag=True, - default=False, - required=False, - help=('(Advanced) Forcefully remove the cluster(s) from ' - 'SkyPilot\'s cluster table, even if the actual cluster termination ' - 'failed on the cloud. WARNING: This flag should only be set sparingly' - ' in certain manual troubleshooting scenarios; with it set, it is the' - ' user\'s responsibility to ensure there are no leaked instances and ' - 'related resources.')) + + +@setup_start_factory +@usage_lib.entrypoint +def start(*args, **kwargs): + ctx = click.get_current_context() + #sky_check(*args, **kwargs) + ctx.invoke(sky_start, *args, **kwargs) + """Deletes a local cluster.""" + +@setup_stop_factory +@usage_lib.entrypoint +def stop(*args, **kwargs): + ctx = click.get_current_context() + #sky_check(*args, **kwargs) + ctx.invoke(sky_stop, *args, **kwargs) + """Deletes a local cluster.""" + +@setup_down_factory @usage_lib.entrypoint def down(*args, **kwargs): ctx = click.get_current_context() @@ -96,10 +99,13 @@ def down(*args, **kwargs): """Deletes a local cluster.""" - cli.add_command(status) cli.add_command(launch) cli.add_command(check) +cli.add_command(start) +cli.add_command(stop) +cli.add_command(down) +cli.add_command(exec) if __name__ == '__main__': cli() diff --git a/src/pymc_server/cli_factory.py b/src/pymc_server/cli_factory.py index 1966b99..faaf8e4 100644 --- a/src/pymc_server/cli_factory.py +++ b/src/pymc_server/cli_factory.py @@ -188,3 +188,144 @@ def setup_launch_factory(func): for option in reversed(options): func = option(func) return func + +def setup_exec_factory(func): + options = [ + #cli.command(cls=_DocumentedCodeCommand), + click.command(cls=_DocumentedCodeCommand), + + click.argument('cluster', + required=False, + type=str, + **_get_shell_complete_args(_complete_cluster_name)), + click.option( + '--cluster', + '-c', + 'cluster_option', + hidden=True, + type=str, + help='This is the same as the positional argument, just for consistency.', + **_get_shell_complete_args(_complete_cluster_name)), + click.argument('entrypoint', + required=False, + type=str, + nargs=-1, + **_get_shell_complete_args(_complete_file_name)), + click.option( + '--detach-run', + '-d', + default=False, + is_flag=True, + help=('If True, as soon as a job is submitted, return from this call ' + 'and do not stream execution logs.')), + _add_click_options(_TASK_OPTIONS_WITH_NAME + _EXTRA_RESOURCES_OPTIONS), + usage_lib.entrypoint + ] + + +def setup_start_factory(func): + ''' + clusters: List[str], + all: bool, + yes: bool, + idle_minutes_to_autostop: Optional[int], + down: bool, # pylint: disable=redefined-outer-name + retry_until_up: bool, + force: bool): + ''' + + options = [ + click.command(cls=_DocumentedCodeCommand), + click.argument('clusters', + nargs=-1, + required=False, + **_get_shell_complete_args(_complete_cluster_name)), + click.option('--all', + '-a', + default=False, + is_flag=True, + required=False, + help='Start all existing clusters.'), + click.option('--yes', + '-y', + is_flag=True, + default=False, + required=False, + help='Skip confirmation prompt.'), + click.option( + '--idle-minutes-to-autostop', + '-i', + default=None, + type=int, + required=False, + help=('Automatically stop the cluster after this many minutes ' + 'of idleness, i.e., no running or pending jobs in the cluster\'s job ' + 'queue. Idleness gets reset whenever setting-up/running/pending jobs ' + 'are found in the job queue. ' + 'Setting this flag is equivalent to ' + 'running ``sky launch -d ...`` and then ``sky autostop -i ``' + '. If not set, the cluster will not be autostopped.')), + click.option( + '--down', + default=False, + is_flag=True, + required=False, + help= + ('Autodown the cluster: tear down the cluster after specified minutes of ' + 'idle time after all jobs finish (successfully or abnormally). Requires ' + '--idle-minutes-to-autostop to be set.'), + ), + click.option( + '--retry-until-up', + '-r', + default=False, + is_flag=True, + required=False, + # Disabling quote check here, as there seems to be a bug in pylint, + # which incorrectly recognizes the help string as a docstring. + # pylint: disable=bad-docstring-quotes + help=('Retry provisioning infinitely until the cluster is up, ' + 'if we fail to start the cluster due to unavailability errors.'), + ), + click.option( + '--force', + '-f', + default=False, + is_flag=True, + required=False, + help=('Force start the cluster even if it is already UP. Useful for ' + 'upgrading the SkyPilot runtime on the cluster.')), + + usage_lib.entrypoint + ] + for option in reversed(options): + func = option(func) + print("func",func) + return func + + +def setup_stop_factory(func): + options = [ + + click.command(cls=_DocumentedCodeCommand), + click.argument('clusters', + nargs=-1, + required=False, + **_get_shell_complete_args(_complete_cluster_name)), + click.option('--all', + '-a', + default=None, + is_flag=True, + help='Stop all existing clusters.'), + click.option('--yes', + '-y', + is_flag=True, + default=False, + required=False, + help='Skip confirmation prompt.') + + ] + for option in reversed(options): + func = option(func) + print("func",func) + return func \ No newline at end of file diff --git a/src/pymc_server/commands/down_cli.py b/src/pymc_server/commands/down_cli.py index 718ac59..8218533 100644 --- a/src/pymc_server/commands/down_cli.py +++ b/src/pymc_server/commands/down_cli.py @@ -7,9 +7,51 @@ from sky.usage import usage_lib from sky.utils import controller_utils,subprocess_utils from rich import progress as rich_progress +from sky.cli import _get_glob_clusters + +import click +from sky.cli import ( + _DocumentedCodeCommand, + _get_shell_complete_args, + _complete_cluster_name, +) + +def setup_down_factory(func): + options = [ + click.command(cls=_DocumentedCodeCommand), + click.argument('clusters', + nargs=-1, + required=False, + **_get_shell_complete_args(_complete_cluster_name)), + click.option('--all', + '-a', + default=None, + is_flag=True, + help='Tear down all existing clusters.'), + click.option('--yes', + '-y', + is_flag=True, + default=False, + required=False, + help='Skip confirmation prompt.'), + click.option( + '--purge', + '-p', + is_flag=True, + default=False, + required=False, + help=('(Advanced) Forcefully remove the cluster(s) from ' + 'SkyPilot\'s cluster table, even if the actual cluster termination ' + 'failed on the cloud. WARNING: This flag should only be set sparingly' + ' in certain manual troubleshooting scenarios; with it set, it is the' + ' user\'s responsibility to ensure there are no leaked instances and ' + 'related resources.')), + ] + for option in reversed(options): + func = option(func) + return func -from sky.cli import _get_glob_clusters prefix = "pymcs" def local_down(): diff --git a/src/pymc_server/commands/exec_cli.py b/src/pymc_server/commands/exec_cli.py new file mode 100644 index 0000000..af13476 --- /dev/null +++ b/src/pymc_server/commands/exec_cli.py @@ -0,0 +1,135 @@ +import click +import sky +import pymc_server +import colorama +from typing import Dict, List, Optional, Tuple, Union + +def exec( + cluster: Optional[str], + cluster_option: Optional[str], + entrypoint: Tuple[str, ...], + detach_run: bool, + name: Optional[str], + cloud: Optional[str], + region: Optional[str], + zone: Optional[str], + workdir: Optional[str], + gpus: Optional[str], + ports: Tuple[str], + instance_type: Optional[str], + num_nodes: Optional[int], + use_spot: Optional[bool], + image_id: Optional[str], + env_file: Optional[Dict[str, str]], + env: List[Tuple[str, str]], + cpus: Optional[str], + memory: Optional[str], + disk_size: Optional[int], + disk_tier: Optional[str], +): + # NOTE(dev): Keep the docstring consistent between the Python API and CLI. + """Execute a task or command on an existing cluster. + + If ENTRYPOINT points to a valid YAML file, it is read in as the task + specification. Otherwise, it is interpreted as a bash command. + + Actions performed by ``sky exec``: + + 1. Workdir syncing, if: + + - ENTRYPOINT is a YAML with the ``workdir`` field specified; or + + - Flag ``--workdir=`` is set. + + 2. Executing the specified task's ``run`` commands / the bash command. + + ``sky exec`` is thus typically faster than ``sky launch``, provided a + cluster already exists. + + All setup steps (provisioning, setup commands, file mounts syncing) are + skipped. If any of those specifications changed, this command will not + reflect those changes. To ensure a cluster's setup is up to date, use ``sky + launch`` instead. + + Execution and scheduling behavior: + + - The task/command will undergo job queue scheduling, respecting any + specified resource requirement. It can be executed on any node of the + cluster with enough resources. + + - The task/command is run under the workdir (if specified). + + - The task/command is run non-interactively (without a pseudo-terminal or + pty), so interactive commands such as ``htop`` do not work. Use ``ssh + my_cluster`` instead. + + Typical workflow: + + .. code-block:: bash + + # First command: set up the cluster once. + sky launch -c mycluster app.yaml + \b + # For iterative development, simply execute the task on the launched + # cluster. + sky exec mycluster app.yaml + \b + # Do "sky launch" again if anything other than Task.run is modified: + sky launch -c mycluster app.yaml + \b + # Pass in commands for execution. + sky exec mycluster python train_cpu.py + sky exec mycluster --gpus=V100:1 python train_gpu.py + \b + # Pass environment variables to the task. + sky exec mycluster --env WANDB_API_KEY python train_gpu.py + + """ + if cluster_option is None and cluster is None: + raise click.UsageError('Missing argument \'[CLUSTER]\' and ' + '\'[ENTRYPOINT]...\'') + if cluster_option is not None: + if cluster is not None: + entrypoint = (cluster,) + entrypoint + cluster = cluster_option + if not entrypoint: + raise click.UsageError('Missing argument \'[ENTRYPOINT]...\'') + assert cluster is not None, (cluster, cluster_option, entrypoint) + + env = _merge_env_vars(env_file, env) + controller_utils.check_cluster_name_not_controller( + cluster, operation_str='Executing task on it') + handle = global_user_state.get_handle_from_cluster_name(cluster) + if handle is None: + raise click.BadParameter(f'Cluster {cluster!r} not found. ' + 'Use `sky launch` to provision first.') + backend = backend_utils.get_backend_from_handle(handle) + + task_or_dag = _make_task_or_dag_from_entrypoint_with_overrides( + entrypoint=entrypoint, + name=name, + workdir=workdir, + cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + use_spot=use_spot, + image_id=image_id, + num_nodes=num_nodes, + env=env, + disk_size=disk_size, + disk_tier=disk_tier, + ports=ports, + field_to_ignore=['cpus', 'memory', 'disk_size', 'disk_tier', 'ports'], + ) + + if isinstance(task_or_dag, sky.Dag): + raise click.UsageError('YAML specifies a DAG, while `sky exec` ' + 'supports a single task only.') + task = task_or_dag + + click.secho(f'Executing task on cluster {cluster}...', fg='yellow') + sky.exec(task, backend=backend, cluster_name=cluster, detach_run=detach_run) diff --git a/src/pymc_server/commands/launch_cli.py b/src/pymc_server/commands/launch_cli.py index 9f6a71e..4d7b38d 100644 --- a/src/pymc_server/commands/launch_cli.py +++ b/src/pymc_server/commands/launch_cli.py @@ -11,16 +11,20 @@ from sky import backends from sky import serve as serve_lib from sky import jobs as managed_jobs -from sky.cli import _parse_override_params,_merge_env_vars +from sky.cli import _parse_override_params, _merge_env_vars from sky.utils import dag_utils,ux_utils from sky.utils import common_utils from sky.utils import controller_utils from sky.usage import usage_lib -from sky.cli import _launch_with_confirm - +#from sky import core from pymc_server.utils.names import generate_cluster_name -from pymc_server.utils.yaml import get_config_from_yaml,load_chain_dag_from_yaml +from pymc_server.utils.launch import launch_with_confirm as _launch_with_confirm +from pymc_server.utils.yaml import ( + get_config_from_yaml, load_chain_dag_from_yaml, + _make_task_or_dag_from_entrypoint_with_overrides, + get_auto_stop +) from sky.cli import ( status as sky_status, launch as sky_launch, @@ -28,295 +32,6 @@ ) def launch( - entrypoint: Tuple[str, ...], - module_name:Optional[str], - cluster: Optional[str], - dryrun: bool, - detach_setup: bool, - detach_run: bool, - backend_name: Optional[str], - name: Optional[str], - workdir: Optional[str], - cloud: Optional[str], - region: Optional[str], - zone: Optional[str], - gpus: Optional[str], - cpus: Optional[str], - memory: Optional[str], - instance_type: Optional[str], - num_nodes: Optional[int], - use_spot: Optional[bool], - image_id: Optional[str], - env_file: Optional[Dict[str, str]], - env: List[Tuple[str, str]], - disk_size: Optional[int], - disk_tier: Optional[str], - ports: Tuple[str], - idle_minutes_to_autostop: Optional[int], - retry_until_up: bool, - yes: bool, - no_setup: bool, - clone_disk_from: Optional[str], -): - - configs, is_yaml = get_config_from_yaml(entrypoint,module_name) - - entrypoint_name = 'Task', - if is_yaml: - # Treat entrypoint as a yaml. - click.secho(f'{entrypoint_name} from YAML spec: ', - fg='yellow', - nl=False) - click.secho(configs, bold=True) - - env: List[Tuple[str, str]] = [] - - if is_yaml: - assert configs is not None - - #remove_key(configs[0],'pymc_yaml') - usage_lib.messages.usage.update_user_task_yaml(configs[0]) - dag = load_chain_dag_from_yaml(configs = configs) - task = dag.tasks[0] - - if len(dag.tasks) > 1: - # When the dag has more than 1 task. It is unclear how to - # override the params for the dag. So we just ignore the - # override params. - if override_params: - click.secho( - f'WARNING: override params {override_params} are ignored, ' - 'since the yaml file contains multiple tasks.', - fg='yellow') - return dag - - assert len(dag.tasks) == 1, ( - f'If you see this, please file an issue; tasks: {dag.tasks}') - - - else: - - task = sky.Task(name='sky-cmd', run=configs) - task.set_resources({sky.Resources()}) - # env update has been done for DAG in load_chain_dag_from_yaml for YAML. - task.update_envs(env) - # Override. - #workdir = None - #job_recovery = None - #num_nodes = None - #name = None - if workdir is not None: - task.workdir = workdir - - # job launch specific. - #if job_recovery is not None: - # override_params['job_recovery'] = job_recovery - - - - if num_nodes is not None: - task.num_nodes = num_nodes - if name is not None: - task.name = name - - - if isinstance(task, sky.Dag): - raise click.UsageError( - _DAG_NOT_SUPPORTED_MESSAGE.format(command=not_supported_cmd)) - #if task.service is None: - # with ux_utils.print_exception_no_traceback(): - # raise ValueError('Service section not found in the YAML file. ' - # 'To fix, add a valid `service` field.') - #print(task) - service_port: Optional[int] = None - for requested_resources in list(task.resources): - """ - if requested_resources.ports is None or len( - requested_resources.ports) != 1: - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Must only specify one port in resources. Each replica ' - 'will use the port specified as application ingress port.') - service_port_str = requested_resources.ports[0] - if not service_port_str.isdigit(): - # For the case when the user specified a port range like 10000-10010 - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Port {service_port_str!r} is not a valid ' - 'port number. Please specify a single port ' - f'instead. Got: {service_port_str!r}') - # We request all the replicas using the same port for now, but it - # should be fine to allow different replicas to use different ports - # in the future. - resource_port = int(service_port_str) - if service_port is None: - service_port = resource_port - if service_port != resource_port: - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Got multiple ports: {service_port} and ' - f'{resource_port} in different resources. ' - 'Please specify single port instead.') - - """ - click.secho('Service Spec:', fg='cyan') - click.echo(task.service) - - click.secho('New replica will use the following resources (estimated):', - fg='cyan') - - with sky.Dag() as dag: - dag.add(task) - sky.optimize(dag) - - - click.secho(f"service_name, {service_name}:", fg='cyan') - - if service_name is None: - service_name = serve_lib.generate_service_name() - - - if not yes: - click.confirm(f'Updating service {service_name!r}. Proceed?', - default=True, - abort=True, - show_default=True) - print("manage jobs now!") - managed_jobs.launch(dag, - name, - detach_run=detach_run, - retry_until_up=retry_until_up) - - #serve_lib.update(task, service_name, mode=serve_lib.UpdateMode(mode)) - - return task - - -def cli_launch_( - entrypoint: Tuple[str, ...], - module_name:Optional[str], - cluster: Optional[str], - dryrun: bool, - detach_setup: bool, - detach_run: bool, - backend_name: Optional[str], - name: Optional[str], - workdir: Optional[str], - cloud: Optional[str], - region: Optional[str], - zone: Optional[str], - gpus: Optional[str], - cpus: Optional[str], - memory: Optional[str], - instance_type: Optional[str], - num_nodes: Optional[int], - use_spot: Optional[bool], - image_id: Optional[str], - env_file: Optional[Dict[str, str]], - env: List[Tuple[str, str]], - disk_size: Optional[int], - disk_tier: Optional[str], - ports: Tuple[str], - idle_minutes_to_autostop: Optional[int], - retry_until_up: bool, - yes: bool, - no_setup: bool, - clone_disk_from: Optional[str], - # job launch specific - job_recovery: Optional[str] = None, -): - """Launch a managed job from a YAML or a command. - - If ENTRYPOINT points to a valid YAML file, it is read in as the task - specification. Otherwise, it is interpreted as a bash command. - - Examples: - - .. code-block:: bash - - # You can use normal task YAMLs. - sky jobs launch task.yaml - - sky jobs launch 'echo hello!' - """ - if cluster is not None: - if name is not None and name != cluster: - raise click.UsageError('Cannot specify both --name and --cluster. ' - 'Use one of the flags as they are alias.') - name = cluster - env = _merge_env_vars(env_file, env) - task_or_dag = _make_task_or_dag_from_entrypoint_with_overrides( - entrypoint, - module_name=module_name, - name=name, - workdir=workdir, - cloud=cloud, - region=region, - zone=zone, - gpus=gpus, - cpus=cpus, - memory=memory, - instance_type=instance_type, - num_nodes=num_nodes, - use_spot=use_spot, - image_id=image_id, - env=env, - disk_size=disk_size, - disk_tier=disk_tier, - ports=ports, - job_recovery=job_recovery, - ) - - # Deprecation. We set the default behavior to be retry until up, and the - # flag `--retry-until-up` is deprecated. We can remove the flag in 0.8.0. - if retry_until_up is not None: - flag_str = '--retry-until-up' - if not retry_until_up: - flag_str = '--no-retry-until-up' - click.secho( - f'Flag {flag_str} is deprecated and will be removed in a ' - 'future release (managed jobs will always be retried). ' - 'Please file an issue if this does not work for you.', - fg='yellow') - else: - retry_until_up = True - - - - if not isinstance(task_or_dag, sky.Dag): - assert isinstance(task_or_dag, sky.Task), task_or_dag - with sky.Dag() as dag: - dag.add(task_or_dag) - dag.name = task_or_dag.name - else: - dag = task_or_dag - if name is not None: - dag.name = name - if dag.name is None : - dag.name = generate_cluster_name() - - dag.name = generate_cluster_name() - click.secho(f'Managed job {dag.name!r} will be launched on (estimated):', - fg='yellow') - dag_utils.maybe_infer_and_fill_dag_and_task_names(dag) - dag_utils.fill_default_config_in_dag_for_job_launch(dag) - - - click.secho(f'Managed job {dag.name!r} will be launched on (estimated):', - fg='yellow') - dag = sky.optimize(dag) - - if not yes: - prompt = f'Launching a managed job {dag.name!r}. Proceed?' - if prompt is not None: - click.confirm(prompt, default=True, abort=True, show_default=True) - - common_utils.check_cluster_name_is_valid(name) - - managed_jobs.launch(dag, - name, - detach_run=detach_run, - retry_until_up=retry_until_up) - -def launch_2( entrypoint: Tuple[str, ...], module_name:Optional[str], base_config_folder:Optional[str], @@ -382,6 +97,13 @@ def launch_2( disk_tier=disk_tier, ports=ports, ) + """ set auto stop/down from config """ + + + + + auto_idle_minutes,auto_down = get_auto_stop(entrypoint,module_name,base_config_folder) + if isinstance(task_or_dag, sky.Dag): raise click.UsageError( @@ -405,140 +127,33 @@ def launch_2( f'{colorama.Style.RESET_ALL}{colorama.Style.BRIGHT}pymcs serve up' f'{colorama.Style.RESET_ALL}') - cluster_name = generate_cluster_name() - - _launch_with_confirm(task, - backend, - cluster=cluster_name, - dryrun=dryrun, - detach_setup=detach_setup, - detach_run=detach_run, - no_confirm=yes, - idle_minutes_to_autostop=idle_minutes_to_autostop, - down=down, - retry_until_up=retry_until_up, - no_setup=no_setup, - clone_disk_from=clone_disk_from) + cluster_name = generate_cluster_name(cluster) + print(f'{colorama.Fore.YELLOW}new clustername: {cluster_name}') + + _launch_with_confirm( + task, + backend, + cluster=cluster_name, + dryrun=dryrun, + detach_setup=detach_setup, + detach_run=detach_run, + no_confirm=yes, + idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, + retry_until_up=retry_until_up, + no_setup=no_setup, + clone_disk_from=clone_disk_from + ) click.secho(f'{colorama.Fore.YELLOW}new cluster: ' f'{colorama.Style.RESET_ALL}{cluster_name}') + #cluster_records = core.status(cluster_names='cluster_name',refresh=False) + #handle = cluster_records[0]['handle'] + #head_ip = handle.external_ips()[0] + return cluster_name -def _make_task_or_dag_from_entrypoint_with_overrides( - entrypoint: Tuple[str, ...], - module_name:Optional[str], - base_config_folder:Optional[str], - *, - entrypoint_name: str = 'Task', - name: Optional[str] = None, - workdir: Optional[str] = None, - cloud: Optional[str] = None, - region: Optional[str] = None, - zone: Optional[str] = None, - gpus: Optional[str] = None, - cpus: Optional[str] = None, - memory: Optional[str] = None, - instance_type: Optional[str] = None, - num_nodes: Optional[int] = None, - use_spot: Optional[bool] = None, - image_id: Optional[str] = None, - disk_size: Optional[int] = None, - disk_tier: Optional[str] = None, - ports: Optional[Tuple[str]] = None, - env: Optional[List[Tuple[str, str]]] = None, - field_to_ignore: Optional[List[str]] = None, - # job launch specific - job_recovery: Optional[str] = None, -) -> Union[sky.Task, sky.Dag]: - """Creates a task or a dag from an entrypoint with overrides. - - Returns: - A dag iff the entrypoint is YAML and contains more than 1 task. - Otherwise, a task. - """ - #entrypoint = ' '.join(entrypoint) - - configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_folder) - click.secho(f'{colorama.Fore.GREEN}used base_config_folder:' - f'{colorama.Style.RESET_ALL}{base_config_folder}') - click.secho(f'{colorama.Fore.GREEN}used entrypoint:' - f'{colorama.Style.RESET_ALL}{entrypoint}') - - #is_yaml, _ = _check_yaml(entrypoint) - entrypoint: Optional[str] - if is_yaml: - # Treat entrypoint as a yaml. - click.secho(f'{entrypoint_name} from YAML spec: ', - fg='yellow', - nl=False) - click.secho(entrypoint, bold=True) - else: - if not entrypoint: - entrypoint = None - else: - # Treat entrypoint as a bash command. - click.secho(f'{entrypoint_name} from command: ', - fg='yellow', - nl=False) - click.secho(entrypoint, bold=True) - override_params = _parse_override_params(cloud=cloud, - region=region, - zone=zone, - gpus=gpus, - cpus=cpus, - memory=memory, - instance_type=instance_type, - use_spot=use_spot, - image_id=image_id, - disk_size=disk_size, - disk_tier=disk_tier, - ports=ports) - if field_to_ignore is not None: - _pop_and_ignore_fields_in_override_params(override_params, - field_to_ignore) - - if is_yaml: - assert entrypoint is not None - usage_lib.messages.usage.update_user_task_yaml(configs[0]) - dag = load_chain_dag_from_yaml(configs = configs) - #task = dag.tasks[0] - #usage_lib.messages.usage.update_user_task_yaml(entrypoint) - - if len(dag.tasks) > 1: - # When the dag has more than 1 task. It is unclear how to - # override the params for the dag. So we just ignore the - # override params. - if override_params: - click.secho( - f'WARNING: override params {override_params} are ignored, ' - 'since the yaml file contains multiple tasks.', - fg='yellow') - return dag - assert len(dag.tasks) == 1, ( - f'If you see this, please file an issue; tasks: {dag.tasks}') - task = dag.tasks[0] - else: - task = sky.Task(name='pymc-cmd', run=entrypoint) - task.set_resources({sky.Resources()}) - # env update has been done for DAG in load_chain_dag_from_yaml for YAML. - task.update_envs(env) - - # Override. - if workdir is not None: - task.workdir = workdir - - # job launch specific. - if job_recovery is not None: - override_params['job_recovery'] = job_recovery - - task.set_resources_override(override_params) - - if num_nodes is not None: - task.num_nodes = num_nodes - if name is not None: - task.name = name - return task diff --git a/src/pymc_server/commands/status_cli.py b/src/pymc_server/commands/status_cli.py new file mode 100644 index 0000000..ded1d5f --- /dev/null +++ b/src/pymc_server/commands/status_cli.py @@ -0,0 +1,323 @@ +import click +import pymc_server +import uuid +import colorama +import multiprocessing +from typing import Any, Dict, List, Optional, Tuple, Union +import sky +from sky import core +from sky.cli import _get_managed_jobs, _get_services +from sky.utils import controller_utils, rich_utils, ux_utils + +from sky.backends import backend as backend_lib +from sky import backends +from sky.backends import backend_utils +from sky.utils.cli_utils import status_utils +from sky.cli import _get_glob_clusters +from sky import status_lib + +prefix = 'pymcs' +_STATUS_PROPERTY_CLUSTER_NUM_ERROR_MESSAGE = ( + '{cluster_num} cluster{plural} {verb}. Please specify {cause} ' + 'cluster to show its {property}.\nUsage: `{prefix} status --{flag} `') + +def status(all: bool, refresh: bool, ip: bool, endpoints: bool, + endpoint: Optional[int], show_managed_jobs: bool, + show_services: bool, clusters: List[str]): + # NOTE(dev): Keep the docstring consistent between the Python API and CLI. + """Show clusters. + + If CLUSTERS is given, show those clusters. Otherwise, show all clusters. + + If --ip is specified, show the IP address of the head node of the cluster. + Only available when CLUSTERS contains exactly one cluster, e.g. + ``pymcs status --ip mycluster``. + + If --endpoints is specified, show all exposed endpoints in the cluster. + Only available when CLUSTERS contains exactly one cluster, e.g. + ``pymcs status --endpoints mycluster``. To query a single endpoint, you + can use ``pymcs status mycluster --endpoint 8888``. + + The following fields for each cluster are recorded: cluster name, time + since last launch, resources, region, zone, hourly price, status, autostop, + command. + + Display all fields using ``pymcs status -a``. + + Each cluster can have one of the following statuses: + + - ``INIT``: The cluster may be live or down. It can happen in the following + cases: + + - Ongoing provisioning or runtime setup. (A ``pymcs launch`` has started + but has not completed.) + + - Or, the cluster is in an abnormal state, e.g., some cluster nodes are + down, or the SkyPilot runtime is unhealthy. (To recover the cluster, + try ``pymcs launch`` again on it.) + + - ``UP``: Provisioning and runtime setup have succeeded and the cluster is + live. (The most recent ``pymcs launch`` has completed successfully.) + + - ``STOPPED``: The cluster is stopped and the storage is persisted. Use + ``pymcs start`` to restart the cluster. + + Autostop column: + + - Indicates after how many minutes of idleness (no in-progress jobs) the + cluster will be autostopped. '-' means disabled. + + - If the time is followed by '(down)', e.g., '1m (down)', the cluster will + be autodowned, rather than autostopped. + + Getting up-to-date cluster statuses: + + - In normal cases where clusters are entirely managed by SkyPilot (i.e., no + manual operations in cloud consoles) and no autostopping is used, the + table returned by this command will accurately reflect the cluster + statuses. + + - In cases where clusters are changed outside of SkyPilot (e.g., manual + operations in cloud consoles; unmanaged spot clusters getting preempted) + or for autostop-enabled clusters, use ``--refresh`` to query the latest + cluster statuses from the cloud providers. + """ + # Using a pool with 2 worker to run the managed job query and pymcs serve + # service query in parallel to speed up. The pool provides a AsyncResult + # object that can be used as a future. + with multiprocessing.Pool(2) as pool: + # Do not show job queue if user specifies clusters, and if user + # specifies --ip or --endpoint(s). + show_managed_jobs = show_managed_jobs and not any( + [clusters, ip, endpoints]) + show_endpoints = endpoints or endpoint is not None + show_single_endpoint = endpoint is not None + if show_managed_jobs: + # Run managed job query in parallel to speed up the status query. + managed_jobs_future = pool.apply_async( + _get_managed_jobs, + kwds=dict(refresh=False, + skip_finished=True, + show_all=False, + limit_num_jobs_to_show=not all, + is_called_by_user=False)) + + show_services = show_services and not clusters and not ip + if show_services: + # Run the sky serve service query in parallel to speed up the + # status query. + services_future = pool.apply_async(_get_services, + kwds=dict( + service_names=None, + show_all=False, + show_endpoint=False, + is_called_by_user=False)) + if ip or show_endpoints: + if refresh: + raise click.UsageError( + 'Using --ip or --endpoint(s) with --refresh is not' + 'supported for now. To fix, refresh first, ' + 'then query the IP or endpoint.') + + if ip and show_endpoints: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Cannot specify both --ip and --endpoint(s) ' + 'at the same time.') + + if endpoint is not None and endpoints: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Cannot specify both --endpoint and --endpoints ' + 'at the same time.') + + if len(clusters) != 1: + with ux_utils.print_exception_no_traceback(): + plural = 's' if len(clusters) > 1 else '' + cluster_num = (str(len(clusters)) + if len(clusters) > 0 else 'No') + cause = 'a single' if len(clusters) > 1 else 'an existing' + raise ValueError( + _STATUS_PROPERTY_CLUSTER_NUM_ERROR_MESSAGE.format( + prefix=prefix, + cluster_num=cluster_num, + plural=plural, + verb='specified', + cause=cause, + property='IP address' if ip else 'endpoint(s)', + flag='ip' if ip else + ('endpoint port' + if show_single_endpoint else 'endpoints'))) + else: + click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}Clusters' + f'{colorama.Style.RESET_ALL}') + query_clusters: Optional[List[str]] = None + if clusters: + query_clusters = _get_glob_clusters(clusters, silent=ip) + cluster_records = core.status(cluster_names=query_clusters, + refresh=refresh) + if ip or show_endpoints: + if len(cluster_records) != 1: + with ux_utils.print_exception_no_traceback(): + plural = 's' if len(cluster_records) > 1 else '' + cluster_num = (str(len(cluster_records)) + if len(cluster_records) > 0 else + f'{clusters[0]!r}') + verb = 'found' if len(cluster_records) > 0 else 'not found' + cause = 'a single' if len(clusters) > 1 else 'an existing' + raise ValueError( + _STATUS_PROPERTY_CLUSTER_NUM_ERROR_MESSAGE.format( + cluster_num=cluster_num, + plural=plural, + verb=verb, + cause=cause, + property='IP address' if ip else 'endpoint(s)', + flag='ip' if ip else + ('endpoint port' + if show_single_endpoint else 'endpoints'))) + + cluster_record = cluster_records[0] + if cluster_record['status'] != status_lib.ClusterStatus.UP: + with ux_utils.print_exception_no_traceback(): + raise RuntimeError(f'Cluster {cluster_record["name"]!r} ' + 'is not in UP status.') + handle = cluster_record['handle'] + if not isinstance(handle, backends.CloudVmRayResourceHandle): + with ux_utils.print_exception_no_traceback(): + raise ValueError('Querying IP address is not supported ' + 'for local clusters.') + + head_ip = handle.external_ips()[0] + if show_endpoints: + if endpoint: + cluster_endpoint = core.endpoints(cluster_record['name'], + endpoint).get( + endpoint, None) + if not cluster_endpoint: + raise click.Abort( + f'Endpoint {endpoint} not found for cluster ' + f'{cluster_record["name"]!r}.') + click.echo(cluster_endpoint) + else: + cluster_endpoints = core.endpoints(cluster_record['name']) + assert isinstance(cluster_endpoints, dict) + if not cluster_endpoints: + raise click.Abort(f'No endpoint found for cluster ' + f'{cluster_record["name"]!r}.') + for port, port_endpoint in cluster_endpoints.items(): + click.echo( + f'{colorama.Fore.BLUE}{colorama.Style.BRIGHT}{port}' + f'{colorama.Style.RESET_ALL}: ' + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'{port_endpoint}{colorama.Style.RESET_ALL}') + return + click.echo(head_ip) + return + hints = [] + normal_clusters = [] + controllers = [] + for cluster_record in cluster_records: + cluster_name = cluster_record['name'] + controller = controller_utils.Controllers.from_name(cluster_name) + if controller is not None: + controllers.append(cluster_record) + else: + normal_clusters.append(cluster_record) + + num_pending_autostop = 0 + num_pending_autostop += status_utils.show_status_table( + normal_clusters + controllers, all) + + def _try_get_future_result(future) -> Tuple[bool, Any]: + result = None + interrupted = False + try: + result = future.get() + except KeyboardInterrupt: + pool.terminate() + interrupted = True + return interrupted, result + + managed_jobs_query_interrupted = False + if show_managed_jobs: + click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Managed jobs{colorama.Style.RESET_ALL}') + with rich_utils.safe_status('[cyan]Checking managed jobs[/]'): + managed_jobs_query_interrupted, result = _try_get_future_result( + managed_jobs_future) + if managed_jobs_query_interrupted: + # Set to -1, so that the controller is not considered + # down, and the hint for showing sky jobs queue + # will still be shown. + num_in_progress_jobs = -1 + msg = 'KeyboardInterrupt' + else: + num_in_progress_jobs, msg = result + + click.echo(msg) + if num_in_progress_jobs is not None: + # jobs controller is UP. + job_info = '' + if num_in_progress_jobs > 0: + plural_and_verb = ' is' + if num_in_progress_jobs > 1: + plural_and_verb = 's are' + job_info = ( + f'{num_in_progress_jobs} managed job{plural_and_verb} ' + 'in progress') + if (num_in_progress_jobs > + _NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS): + job_info += ( + f' ({_NUM_MANAGED_JOBS_TO_SHOW_IN_STATUS} latest ' + 'ones shown)') + job_info += '. ' + hints.append( + controller_utils.Controllers.JOBS_CONTROLLER.value. + in_progress_hint.format(job_info=job_info)) + + if show_services: + click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Services{colorama.Style.RESET_ALL}') + num_services = None + if managed_jobs_query_interrupted: + # The pool is terminated, so we cannot run the service query. + msg = 'KeyboardInterrupt' + else: + with rich_utils.safe_status('[cyan]Checking services[/]'): + interrupted, result = _try_get_future_result( + services_future) + if interrupted: + num_services = -1 + msg = 'KeyboardInterrupt' + else: + num_services, msg = result + click.echo(msg) + if num_services is not None: + hints.append(controller_utils.Controllers.SKY_SERVE_CONTROLLER. + value.in_progress_hint) + + if show_managed_jobs or show_services: + try: + pool.close() + pool.join() + except SystemExit as e: + # This is to avoid a "Exception ignored" problem caused by + # ray worker setting the sigterm handler to sys.exit(15) + # (see ray/_private/worker.py). + # TODO (zhwu): Remove any importing of ray in SkyPilot. + if e.code != 15: + raise + + if num_pending_autostop > 0 and not refresh: + # Don't print this hint if there's no pending autostop or user has + # already passed --refresh. + plural_and_verb = ' has' + if num_pending_autostop > 1: + plural_and_verb = 's have' + hints.append(f'* {num_pending_autostop} cluster{plural_and_verb} ' + 'auto{stop,down} scheduled. Refresh statuses with: ' + f'{colorama.Style.BRIGHT}sky status --refresh' + f'{colorama.Style.RESET_ALL}') + if hints: + click.echo('\n' + '\n'.join(hints)) + diff --git a/src/pymc_server/utils/launch.py b/src/pymc_server/utils/launch.py new file mode 100644 index 0000000..c8faf3f --- /dev/null +++ b/src/pymc_server/utils/launch.py @@ -0,0 +1,233 @@ +from sky import Task +import sky +import click +import colorama +from sky.backends import backend as backend_lib +from sky.backends import backend_utils +from sky.utils import controller_utils + +from sky import backends +from sky.utils import timeline +from sky.usage import usage_lib +from sky import optimizer +from typing import Dict, List, Optional, Tuple, Union +from sky import exceptions +from sky import check as sky_check +from sky.execution import _execute + + + + + +def launch_with_confirm( + task: Task, + backend: backends.Backend, + cluster: Optional[str], + *, + dryrun: bool, + detach_run: bool, + detach_setup: bool = False, + no_confirm: bool = False, + idle_minutes_to_autostop: Optional[int] = None, + down: bool = False, # pylint: disable=redefined-outer-name + retry_until_up: bool = False, + no_setup: bool = False, + clone_disk_from: Optional[str] = None, +): + """Launch a cluster with a Task.""" + if cluster is None: + cluster = backend_utils.generate_cluster_name() + + clone_source_str = '' + if clone_disk_from is not None: + clone_source_str = f' from the disk of {clone_disk_from!r}' + task, _ = backend_utils.check_can_clone_disk_and_override_task( + clone_disk_from, cluster, task) + + with sky.Dag() as dag: + dag.add(task) + + maybe_status, handle = backend_utils.refresh_cluster_status_handle(cluster) + if maybe_status is None: + # Show the optimize log before the prompt if the cluster does not exist. + try: + sky_check.get_cached_enabled_clouds_or_refresh( + raise_if_no_cloud_access=True) + except exceptions.NoCloudAccessError as e: + # Catch the exception where the public cloud is not enabled, and + # make it yellow for better visibility. + with ux_utils.print_exception_no_traceback(): + raise RuntimeError(f'{colorama.Fore.YELLOW}{e}' + f'{colorama.Style.RESET_ALL}') from e + dag = sky.optimize(dag) + task = dag.tasks[0] + + if handle is not None: + backend.check_resources_fit_cluster(handle, task) + + confirm_shown = False + if not no_confirm: + # Prompt if (1) --cluster is None, or (2) cluster doesn't exist, or (3) + # it exists but is STOPPED. + prompt = None + if maybe_status is None: + cluster_str = '' if cluster is None else f' {cluster!r}' + prompt = ( + f'Launching a new cluster{cluster_str}{clone_source_str}. ' + 'Proceed?') + elif maybe_status == status_lib.ClusterStatus.STOPPED: + prompt = f'Restarting the stopped cluster {cluster!r}. Proceed?' + if prompt is not None: + confirm_shown = True + click.confirm(prompt, default=True, abort=True, show_default=True) + + if not confirm_shown: + click.secho(f'Running task on cluster {cluster}...', fg='yellow') + + launch( + dag, + dryrun=dryrun, + stream_logs=True, + cluster_name=cluster, + detach_setup=detach_setup, + detach_run=detach_run, + backend=backend, + idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, + retry_until_up=retry_until_up, + no_setup=no_setup, + clone_disk_from=clone_disk_from, + ) + +@timeline.event +@usage_lib.entrypoint +def launch( + task: Union['pymcs.Task', 'pymcs.Dag'], + cluster_name: Optional[str] = None, + retry_until_up: bool = False, + idle_minutes_to_autostop: Optional[int] = None, + dryrun: bool = False, + down: bool = False, + stream_logs: bool = True, + backend: Optional[backends.Backend] = None, + optimize_target: optimizer.OptimizeTarget = optimizer.OptimizeTarget.COST, + detach_setup: bool = False, + detach_run: bool = False, + no_setup: bool = False, + clone_disk_from: Optional[str] = None, + # Internal only: + # pylint: disable=invalid-name + _is_launched_by_jobs_controller: bool = False, + _is_launched_by_sky_serve_controller: bool = False, + _disable_controller_check: bool = False, +) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]: + # NOTE(dev): Keep the docstring consistent between the Python API and CLI. + """Launch a cluster or task. + + The task's setup and run commands are executed under the task's workdir + (when specified, it is synced to remote cluster). The task undergoes job + queue scheduling on the cluster. + + Currently, the first argument must be a sky.Task, or (EXPERIMENTAL advanced + usage) a sky.Dag. In the latter case, currently it must contain a single + task; support for pipelines/general DAGs are in experimental branches. + + Args: + task: sky.Task, or sky.Dag (experimental; 1-task only) to launch. + cluster_name: name of the cluster to create/reuse. If None, + auto-generate a name. + retry_until_up: whether to retry launching the cluster until it is + up. + idle_minutes_to_autostop: automatically stop the cluster after this + many minute of idleness, i.e., no running or pending jobs in the + cluster's job queue. Idleness gets reset whenever setting-up/ + running/pending jobs are found in the job queue. Setting this + flag is equivalent to running + ``sky.launch(..., detach_run=True, ...)`` and then + ``sky.autostop(idle_minutes=)``. If not set, the cluster + will not be autostopped. + down: Tear down the cluster after all jobs finish (successfully or + abnormally). If --idle-minutes-to-autostop is also set, the + cluster will be torn down after the specified idle time. + Note that if errors occur during provisioning/data syncing/setting + up, the cluster will not be torn down for debugging purposes. + dryrun: if True, do not actually launch the cluster. + stream_logs: if True, show the logs in the terminal. + backend: backend to use. If None, use the default backend + (CloudVMRayBackend). + optimize_target: target to optimize for. Choices: OptimizeTarget.COST, + OptimizeTarget.TIME. + detach_setup: If True, run setup in non-interactive mode as part of the + job itself. You can safely ctrl-c to detach from logging, and it + will not interrupt the setup process. To see the logs again after + detaching, use `sky logs`. To cancel setup, cancel the job via + `sky cancel`. Useful for long-running setup + commands. + detach_run: If True, as soon as a job is submitted, return from this + function and do not stream execution logs. + no_setup: if True, do not re-run setup commands. + clone_disk_from: [Experimental] if set, clone the disk from the + specified cluster. This is useful to migrate the cluster to a + different availability zone or region. + + Example: + .. code-block:: python + + import sky + task = sky.Task(run='echo hello SkyPilot') + task.set_resources( + sky.Resources(cloud=sky.AWS(), accelerators='V100:4')) + sky.launch(task, cluster_name='my-cluster') + + Raises: + exceptions.ClusterOwnerIdentityMismatchError: if the cluster is + owned by another user. + exceptions.InvalidClusterNameError: if the cluster name is invalid. + exceptions.ResourcesMismatchError: if the requested resources + do not match the existing cluster. + exceptions.NotSupportedError: if required features are not supported + by the backend/cloud/cluster. + exceptions.ResourcesUnavailableError: if the requested resources + cannot be satisfied. The failover_history of the exception + will be set as: + 1. Empty: iff the first-ever sky.optimize() fails to + find a feasible resource; no pre-check or actual launch is + attempted. + 2. Non-empty: iff at least 1 exception from either + our pre-checks (e.g., cluster name invalid) or a region/zone + throwing resource unavailability. + exceptions.CommandError: any ssh command error. + exceptions.NoCloudAccessError: if all clouds are disabled. + Other exceptions may be raised depending on the backend. + + Returns: + job_id: Optional[int]; the job ID of the submitted job. None if the + backend is not CloudVmRayBackend, or no job is submitted to + the cluster. + handle: Optional[backends.ResourceHandle]; the handle to the cluster. None + if dryrun. + """ + entrypoint = task + if not _disable_controller_check: + controller_utils.check_cluster_name_not_controller( + cluster_name, operation_str='pymcs.launch') + + return _execute( + entrypoint=entrypoint, + dryrun=dryrun, + down=down, + stream_logs=stream_logs, + handle=None, + backend=backend, + retry_until_up=retry_until_up, + optimize_target=optimize_target, + cluster_name=cluster_name, + detach_setup=detach_setup, + detach_run=detach_run, + idle_minutes_to_autostop=idle_minutes_to_autostop, + no_setup=no_setup, + clone_disk_from=clone_disk_from, + _is_launched_by_jobs_controller=_is_launched_by_jobs_controller, + _is_launched_by_sky_serve_controller= + _is_launched_by_sky_serve_controller, + ) \ No newline at end of file diff --git a/src/pymc_server/utils/names.py b/src/pymc_server/utils/names.py index 25d3301..0981c43 100644 --- a/src/pymc_server/utils/names.py +++ b/src/pymc_server/utils/names.py @@ -1,7 +1,8 @@ import uuid from sky.utils import common_utils + def generate_service_name(prefix = "pymcs"): return f'{prefix}-service-{uuid.uuid4().hex[:4]}' -def generate_cluster_name(prefix = "pymcs"): - return f'{prefix}-{uuid.uuid4().hex[:4]}-{common_utils.get_cleaned_username()}' \ No newline at end of file +def generate_cluster_name(cluster=common_utils.get_cleaned_username(),prefix = "pymcs"): + return f'{prefix}-{uuid.uuid4().hex[:4]}-{cluster}' diff --git a/src/pymc_server/utils/yaml.py b/src/pymc_server/utils/yaml.py index 143a606..d8db8d9 100644 --- a/src/pymc_server/utils/yaml.py +++ b/src/pymc_server/utils/yaml.py @@ -8,6 +8,11 @@ import os import os.path import pymc_server +import sky + +from sky import clouds as sky_clouds +from sky.cli import _parse_override_params +from sky.usage import usage_lib from .names import generate_cluster_name @@ -57,6 +62,7 @@ def load_chain_dag_from_yaml( A chain Dag with 1 or more tasks (an empty entrypoint would create a trivial task). """ + dag_name = None if set(configs[0].keys()) == {'name'}: dag_name = configs[0]['name'] @@ -75,7 +81,6 @@ def load_chain_dag_from_yaml( continue task = task_lib.Task.from_yaml_config(task_config, env_overrides) if current_task is not None: - print("DAG afterCheck::::") current_task >> task # pylint: disable=pointless-statement current_task = task dag.name = dag_name @@ -143,7 +148,6 @@ def set_config(config): def exists(path): exists_ = os.path.exists(path) - print(f"exists_ :{exists_}") if exists_ is False: raise Exception(f'{colorama.Fore.RED}File Not Found: ' f'{colorama.Fore.YELLOW}{path}{colorama.Style.RESET_ALL}') @@ -183,3 +187,144 @@ def get_config_from_yaml(entrypoint: Tuple[str, ...],module_name:Optional[str],b if is_yaml: configs = [set_config(config) for config in configs] return configs, is_yaml, module_config_path + + + +def _make_task_or_dag_from_entrypoint_with_overrides( + entrypoint: Tuple[str, ...], + module_name:Optional[str], + base_config_folder:Optional[str], + *, + entrypoint_name: str = 'Task', + name: Optional[str] = None, + workdir: Optional[str] = None, + cloud: Optional[str] = None, + region: Optional[str] = None, + zone: Optional[str] = None, + gpus: Optional[str] = None, + cpus: Optional[str] = None, + memory: Optional[str] = None, + instance_type: Optional[str] = None, + num_nodes: Optional[int] = None, + use_spot: Optional[bool] = None, + image_id: Optional[str] = None, + disk_size: Optional[int] = None, + disk_tier: Optional[str] = None, + ports: Optional[Tuple[str]] = None, + env: Optional[List[Tuple[str, str]]] = None, + field_to_ignore: Optional[List[str]] = None, + # job launch specific + job_recovery: Optional[str] = None, +) -> Union[sky.Task, sky.Dag]: + """Creates a task or a dag from an entrypoint with overrides. + + Returns: + A dag iff the entrypoint is YAML and contains more than 1 task. + Otherwise, a task. + """ + #entrypoint = ' '.join(entrypoint) + + configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_folder) + + clean_up_config(configs) + + print(f'{colorama.Fore.YELLOW} ERRRRROOOOORRR HERE') + + click.secho(f'{colorama.Fore.GREEN}used base_config_folder:' + f'{colorama.Style.RESET_ALL}{base_config_folder}') + click.secho(f'{colorama.Fore.GREEN}used entrypoint:' + f'{colorama.Style.RESET_ALL}{entrypoint}') + + #is_yaml, _ = _check_yaml(entrypoint) + entrypoint: Optional[str] + if is_yaml: + # Treat entrypoint as a yaml. + click.secho(f'{entrypoint_name} from YAML spec: ', + fg='yellow', + nl=False) + click.secho(entrypoint, bold=True) + else: + if not entrypoint: + entrypoint = None + else: + # Treat entrypoint as a bash command. + click.secho(f'{entrypoint_name} from command: ', + fg='yellow', + nl=False) + click.secho(entrypoint, bold=True) + + override_params = _parse_override_params(cloud=cloud, + region=region, + zone=zone, + gpus=gpus, + cpus=cpus, + memory=memory, + instance_type=instance_type, + use_spot=use_spot, + image_id=image_id, + disk_size=disk_size, + disk_tier=disk_tier, + ports=ports) + if field_to_ignore is not None: + _pop_and_ignore_fields_in_override_params(override_params, + field_to_ignore) + + if is_yaml: + assert entrypoint is not None + usage_lib.messages.usage.update_user_task_yaml(configs[0]) + dag = load_chain_dag_from_yaml(configs = configs) + #task = dag.tasks[0] + #usage_lib.messages.usage.update_user_task_yaml(entrypoint) + + if len(dag.tasks) > 1: + # When the dag has more than 1 task. It is unclear how to + # override the params for the dag. So we just ignore the + # override params. + if override_params: + click.secho( + f'WARNING: override params {override_params} are ignored, ' + 'since the yaml file contains multiple tasks.', + fg='yellow') + return dag + assert len(dag.tasks) == 1, ( + f'If you see this, please file an issue; tasks: {dag.tasks}') + task = dag.tasks[0] + + else: + task = sky.Task(name='pymc-cmd', run=entrypoint) + task.set_resources({sky.Resources()}) + # env update has been done for DAG in load_chain_dag_from_yaml for YAML. + task.update_envs(env) + + # Override. + if workdir is not None: + task.workdir = workdir + + # job launch specific. + if job_recovery is not None: + override_params['job_recovery'] = job_recovery + + task.set_resources_override(override_params) + + if num_nodes is not None: + task.num_nodes = num_nodes + if name is not None: + task.name = name + + return task + +def get_auto_stop(entrypoint: Tuple[str, ...],module_name:Optional[str],base_config_path:Optional[str]): + configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_path) + try: + autostop = configs[0]["resources"]['autostop'] if 'autostop' in configs[0]["resources"] else None + idle_minutes = configs[0]["resources"]['autostop']['idle_minutes'] if 'idle_minutes' in autostop else None + down = configs[0]["resources"]['autostop']['down'] if 'down' in autostop else None + return idle_minutes, down + except: return None, None + + +def clean_up_config(config): + try: + del config[0]["resources"]['autostop'] + except : () + From 11c60d756e74b4197dd1652835929266b1ab4154 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 00:11:05 +0100 Subject: [PATCH 2/8] remove some prints --- src/pymc_server/cli_factory.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/pymc_server/cli_factory.py b/src/pymc_server/cli_factory.py index faaf8e4..7b6ab85 100644 --- a/src/pymc_server/cli_factory.py +++ b/src/pymc_server/cli_factory.py @@ -300,7 +300,6 @@ def setup_start_factory(func): ] for option in reversed(options): func = option(func) - print("func",func) return func @@ -327,5 +326,4 @@ def setup_stop_factory(func): ] for option in reversed(options): func = option(func) - print("func",func) return func \ No newline at end of file From fce4b0e471eb8d1fe3d709ae8ecf14a56a37ee28 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 01:33:45 +0100 Subject: [PATCH 3/8] start with entrypoint --- src/pymc_server/cli.py | 3 +- src/pymc_server/cli_factory.py | 8 +- src/pymc_server/commands/launch_cli.py | 9 +- src/pymc_server/commands/start_cli.py | 194 +++++++++++++++++++++++++ src/pymc_server/utils/yaml.py | 24 ++- 5 files changed, 224 insertions(+), 14 deletions(-) create mode 100644 src/pymc_server/commands/start_cli.py diff --git a/src/pymc_server/cli.py b/src/pymc_server/cli.py index 62d3318..ed00e4a 100644 --- a/src/pymc_server/cli.py +++ b/src/pymc_server/cli.py @@ -7,6 +7,7 @@ from pymc_server.commands.launch_cli import launch as launch_cli from pymc_server.commands.exec_cli import exec as exec_cli from pymc_server.commands.status_cli import status as status_cli +from pymc_server.commands.start_cli import start as start_cli from pymc_server.cli_factory import setup_launch_factory, setup_status_factory, setup_exec_factory, setup_start_factory,setup_stop_factory from sky.usage import usage_lib @@ -79,7 +80,7 @@ def check(*args, **kwargs): def start(*args, **kwargs): ctx = click.get_current_context() #sky_check(*args, **kwargs) - ctx.invoke(sky_start, *args, **kwargs) + ctx.invoke(start_cli, *args, **kwargs) """Deletes a local cluster.""" @setup_stop_factory diff --git a/src/pymc_server/cli_factory.py b/src/pymc_server/cli_factory.py index 7b6ab85..a551bf5 100644 --- a/src/pymc_server/cli_factory.py +++ b/src/pymc_server/cli_factory.py @@ -240,6 +240,12 @@ def setup_start_factory(func): nargs=-1, required=False, **_get_shell_complete_args(_complete_cluster_name)), + click.option('--entrypoint', + '-e', + default=None, + type=str, + required=False, + **_get_shell_complete_args(_complete_file_name)), click.option('--all', '-a', default=False, @@ -267,7 +273,7 @@ def setup_start_factory(func): '. If not set, the cluster will not be autostopped.')), click.option( '--down', - default=False, + default=None, is_flag=True, required=False, help= diff --git a/src/pymc_server/commands/launch_cli.py b/src/pymc_server/commands/launch_cli.py index 4d7b38d..5582f18 100644 --- a/src/pymc_server/commands/launch_cli.py +++ b/src/pymc_server/commands/launch_cli.py @@ -98,13 +98,10 @@ def launch( ports=ports, ) """ set auto stop/down from config """ - - - - auto_idle_minutes,auto_down = get_auto_stop(entrypoint,module_name,base_config_folder) - - + idle_minutes_to_autostop = idle_minutes_to_autostop if idle_minutes_to_autostop != None else auto_idle_minutes_to_autostop + down = down if down != None else auto_down + down = False if down == None else down if isinstance(task_or_dag, sky.Dag): raise click.UsageError( _DAG_NOT_SUPPORTED_MESSAGE.format(command='pymcs launch')) diff --git a/src/pymc_server/commands/start_cli.py b/src/pymc_server/commands/start_cli.py new file mode 100644 index 0000000..0fbb3be --- /dev/null +++ b/src/pymc_server/commands/start_cli.py @@ -0,0 +1,194 @@ +import click +import sky +import pymc_server +import colorama +from typing import Dict, List, Optional, Tuple, Union +from sky import global_user_state +from sky.cli import _get_glob_clusters +from sky.backends import backend_utils +from sky import status_lib +from sky.utils import controller_utils +from sky import exceptions +from sky import core + +from pymc_server.utils.yaml import ( + get_auto_stop_from_entrypoint +) + +def start( + clusters: List[str], + entrypoint: Tuple[str, ...], + all: bool, + yes: bool, + idle_minutes_to_autostop: Optional[int], + down: bool, # pylint: disable=redefined-outer-name + retry_until_up: bool, + force: bool): + + if entrypoint != None: + auto_idle_minutes_to_autostop, auto_down = get_auto_stop_from_entrypoint(entrypoint) + idle_minutes_to_autostop = idle_minutes_to_autostop if idle_minutes_to_autostop != None else auto_idle_minutes_to_autostop + print(f"{colorama.Fore.CYAN} autostop: {colorama.Fore.YELLOW} {auto_idle_minutes_to_autostop} {auto_down}{colorama.Style.RESET_ALL}") + + down = down if down != None else auto_down + + down = False if down == None else down + + # NOTE(dev): Keep the docstring consistent between the Python API and CLI. + """Restart cluster(s). + + If a cluster is previously stopped (status is STOPPED) or failed in + provisioning/runtime installation (status is INIT), this command will + attempt to start the cluster. In the latter case, provisioning and runtime + installation will be retried. + + Auto-failover provisioning is not used when restarting a stopped + cluster. It will be started on the same cloud, region, and zone that were + chosen before. + + If a cluster is already in the UP status, this command has no effect. + + Examples: + + .. code-block:: bash + + # Restart a specific cluster. + sky start cluster_name + \b + # Restart multiple clusters. + sky start cluster1 cluster2 + \b + # Restart all clusters. + sky start -a + + """ + if down and idle_minutes_to_autostop is None: + raise click.UsageError( + '--idle-minutes-to-autostop must be set if --down is set.') + to_start = [] + + if not clusters and not all: + # UX: frequently users may have only 1 cluster. In this case, be smart + # and default to that unique choice. + all_cluster_names = global_user_state.get_cluster_names_start_with('') + if len(all_cluster_names) <= 1: + clusters = all_cluster_names + else: + raise click.UsageError( + '`sky start` requires either a cluster name or glob ' + '(see `sky status`), or the -a/--all flag.') + + if all: + if len(clusters) > 0: + click.echo('Both --all and cluster(s) specified for sky start. ' + 'Letting --all take effect.') + + # Get all clusters that are not controllers. + clusters = [ + cluster['name'] + for cluster in global_user_state.get_clusters() + if controller_utils.Controllers.from_name(cluster['name']) is None + ] + + if not clusters: + click.echo('Cluster(s) not found (tip: see `sky status`). Do you ' + 'mean to use `sky launch` to provision a new cluster?') + return + else: + # Get GLOB cluster names + clusters = _get_glob_clusters(clusters) + + for name in clusters: + cluster_status, _ = backend_utils.refresh_cluster_status_handle( + name) + # A cluster may have one of the following states: + # + # STOPPED - ok to restart + # (currently, only AWS/GCP non-spot clusters can be in this + # state) + # + # UP - skipped, see below + # + # INIT - ok to restart: + # 1. It can be a failed-to-provision cluster, so it isn't up + # (Ex: launch --gpus=A100:8). Running `sky start` enables + # retrying the provisioning - without setup steps being + # completed. (Arguably the original command that failed should + # be used instead; but using start isn't harmful - after it + # gets provisioned successfully the user can use the original + # command). + # + # 2. It can be an up cluster that failed one of the setup steps. + # This way 'sky start' can change its status to UP, enabling + # 'sky ssh' to debug things (otherwise `sky ssh` will fail an + # INIT state cluster due to head_ip not being cached). + # + # This can be replicated by adding `exit 1` to Task.setup. + if (not force and cluster_status == status_lib.ClusterStatus.UP): + # An UP cluster; skipping 'sky start' because: + # 1. For a really up cluster, this has no effects (ray up -y + # --no-restart) anyway. + # 2. A cluster may show as UP but is manually stopped in the + # UI. If Azure/GCP: ray autoscaler doesn't support reusing, + # so 'sky start existing' will actually launch a new + # cluster with this name, leaving the original cluster + # zombied (remains as stopped in the cloud's UI). + # + # This is dangerous and unwanted behavior! + click.echo(f'Cluster {name} already has status UP.') + continue + + assert force or cluster_status in ( + status_lib.ClusterStatus.INIT, + status_lib.ClusterStatus.STOPPED), cluster_status + to_start.append(name) + if not to_start: + return + + # Checks for controller clusters (jobs controller / sky serve controller). + controllers, normal_clusters = [], [] + for name in to_start: + if controller_utils.Controllers.from_name(name) is not None: + controllers.append(name) + else: + normal_clusters.append(name) + if controllers and normal_clusters: + # Keep this behavior the same as _down_or_stop_clusters(). + raise click.UsageError('Starting controllers with other cluster(s) ' + 'is currently not supported.\n' + 'Please start the former independently.') + if controllers: + bold = backend_utils.BOLD + reset_bold = backend_utils.RESET_BOLD + if len(controllers) != 1: + raise click.UsageError( + 'Starting multiple controllers is currently not supported.\n' + 'Please start them independently.') + if idle_minutes_to_autostop is not None: + raise click.UsageError( + 'Autostop options are currently not allowed when starting the ' + 'controllers. Use the default autostop settings by directly ' + f'calling: {bold}sky start {" ".join(controllers)}{reset_bold}') + + if not yes: + cluster_str = 'clusters' if len(to_start) > 1 else 'cluster' + cluster_list = ', '.join(to_start) + click.confirm( + f'Restarting {len(to_start)} {cluster_str}: ' + f'{cluster_list}. Proceed?', + default=True, + abort=True, + show_default=True) + + for name in to_start: + try: + core.start(name, + idle_minutes_to_autostop, + retry_until_up, + down=down, + force=force) + except (exceptions.NotSupportedError, + exceptions.ClusterOwnerIdentityMismatchError) as e: + click.echo(str(e)) + else: + click.secho(f'Cluster {name} started.', fg='green') \ No newline at end of file diff --git a/src/pymc_server/utils/yaml.py b/src/pymc_server/utils/yaml.py index d8db8d9..5fd5582 100644 --- a/src/pymc_server/utils/yaml.py +++ b/src/pymc_server/utils/yaml.py @@ -313,16 +313,28 @@ def _make_task_or_dag_from_entrypoint_with_overrides( return task -def get_auto_stop(entrypoint: Tuple[str, ...],module_name:Optional[str],base_config_path:Optional[str]): - configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_path) + +def _get_auto_stop_from_yaml(configs): try: - autostop = configs[0]["resources"]['autostop'] if 'autostop' in configs[0]["resources"] else None - idle_minutes = configs[0]["resources"]['autostop']['idle_minutes'] if 'idle_minutes' in autostop else None - down = configs[0]["resources"]['autostop']['down'] if 'down' in autostop else None - return idle_minutes, down + autostop = configs["resources"]['autostop'] if 'autostop' in configs["resources"] else None + idle_minutes = configs["resources"]['autostop']['idle_minutes'] if 'idle_minutes' in autostop else None + down = configs["resources"]['autostop']['down'] if 'down' in autostop else None + return idle_minutes, down except: return None, None +def get_auto_stop_from_entrypoint(entrypoint): + if entrypoint != (): + exists(entrypoint) + else: return None, None + userYaml = hiyapyco.load(entrypoint, entrypoint, method=hiyapyco.METHOD_MERGE) + return _get_auto_stop_from_yaml(userYaml) + +def get_auto_stop(entrypoint: Tuple[str, ...],module_name:Optional[str],base_config_path:Optional[str]): + configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_path) + return _get_auto_stop_from_yaml(configs[0]) + + def clean_up_config(config): try: del config[0]["resources"]['autostop'] From c07738453b548317115bf807393505429be1c89c Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 01:55:26 +0100 Subject: [PATCH 4/8] clean up --- src/pymc_server/commands/start_cli.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/pymc_server/commands/start_cli.py b/src/pymc_server/commands/start_cli.py index 0fbb3be..d3ec6a9 100644 --- a/src/pymc_server/commands/start_cli.py +++ b/src/pymc_server/commands/start_cli.py @@ -28,10 +28,7 @@ def start( if entrypoint != None: auto_idle_minutes_to_autostop, auto_down = get_auto_stop_from_entrypoint(entrypoint) idle_minutes_to_autostop = idle_minutes_to_autostop if idle_minutes_to_autostop != None else auto_idle_minutes_to_autostop - print(f"{colorama.Fore.CYAN} autostop: {colorama.Fore.YELLOW} {auto_idle_minutes_to_autostop} {auto_down}{colorama.Style.RESET_ALL}") - down = down if down != None else auto_down - down = False if down == None else down # NOTE(dev): Keep the docstring consistent between the Python API and CLI. From d2cd861dbd570bfe91007887a49388bb1eefae63 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 01:57:02 +0100 Subject: [PATCH 5/8] fix in launch_cli.py --- src/pymc_server/commands/launch_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pymc_server/commands/launch_cli.py b/src/pymc_server/commands/launch_cli.py index 5582f18..0977ed6 100644 --- a/src/pymc_server/commands/launch_cli.py +++ b/src/pymc_server/commands/launch_cli.py @@ -99,7 +99,7 @@ def launch( ) """ set auto stop/down from config """ auto_idle_minutes,auto_down = get_auto_stop(entrypoint,module_name,base_config_folder) - idle_minutes_to_autostop = idle_minutes_to_autostop if idle_minutes_to_autostop != None else auto_idle_minutes_to_autostop + idle_minutes_to_autostop = idle_minutes_to_autostop if idle_minutes_to_autostop != None else auto_idle_minutes down = down if down != None else auto_down down = False if down == None else down if isinstance(task_or_dag, sky.Dag): From ab424af412c43b22492ae98f02c0906ef4e70042 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 13:48:23 +0100 Subject: [PATCH 6/8] fix cluster_name --- src/pymc_server/cli_factory.py | 12 +----------- src/pymc_server/utils/names.py | 4 +++- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/pymc_server/cli_factory.py b/src/pymc_server/cli_factory.py index a551bf5..9ed1354 100644 --- a/src/pymc_server/cli_factory.py +++ b/src/pymc_server/cli_factory.py @@ -174,7 +174,7 @@ def setup_launch_factory(func): 'the same data on the boot disk as an existing cluster.')), click.option( '--down', - default=False, + default=None, is_flag=True, required=False, help= @@ -224,16 +224,6 @@ def setup_exec_factory(func): def setup_start_factory(func): - ''' - clusters: List[str], - all: bool, - yes: bool, - idle_minutes_to_autostop: Optional[int], - down: bool, # pylint: disable=redefined-outer-name - retry_until_up: bool, - force: bool): - ''' - options = [ click.command(cls=_DocumentedCodeCommand), click.argument('clusters', diff --git a/src/pymc_server/utils/names.py b/src/pymc_server/utils/names.py index 0981c43..c533fce 100644 --- a/src/pymc_server/utils/names.py +++ b/src/pymc_server/utils/names.py @@ -1,8 +1,10 @@ import uuid +# import colorama from sky.utils import common_utils def generate_service_name(prefix = "pymcs"): return f'{prefix}-service-{uuid.uuid4().hex[:4]}' -def generate_cluster_name(cluster=common_utils.get_cleaned_username(),prefix = "pymcs"): +def generate_cluster_name(cluster:str=None,prefix = "pymcs"): + cluster = cluster if cluster != None else common_utils.get_cleaned_username() return f'{prefix}-{uuid.uuid4().hex[:4]}-{cluster}' From bc747eec062fd3b088f5a1db8e605b835852e909 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Wed, 29 Jan 2025 13:53:21 +0100 Subject: [PATCH 7/8] cleanUp --- src/pymc_server/utils/yaml.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pymc_server/utils/yaml.py b/src/pymc_server/utils/yaml.py index 5fd5582..e8b10aa 100644 --- a/src/pymc_server/utils/yaml.py +++ b/src/pymc_server/utils/yaml.py @@ -225,16 +225,14 @@ def _make_task_or_dag_from_entrypoint_with_overrides( #entrypoint = ' '.join(entrypoint) configs, is_yaml,module_config_path = get_config_from_yaml(entrypoint,module_name,base_config_folder) - clean_up_config(configs) - print(f'{colorama.Fore.YELLOW} ERRRRROOOOORRR HERE') - + ''' click.secho(f'{colorama.Fore.GREEN}used base_config_folder:' f'{colorama.Style.RESET_ALL}{base_config_folder}') click.secho(f'{colorama.Fore.GREEN}used entrypoint:' f'{colorama.Style.RESET_ALL}{entrypoint}') - + ''' #is_yaml, _ = _check_yaml(entrypoint) entrypoint: Optional[str] if is_yaml: From 926607e3437f8af9963127f9cd19e067d1ff3be5 Mon Sep 17 00:00:00 2001 From: Stephan-Mai-pymc* Date: Thu, 30 Jan 2025 11:35:38 +0100 Subject: [PATCH 8/8] exec cmd --- .gitignore | 1 + src/pymc_server/cli.py | 2 +- src/pymc_server/cli_factory.py | 16 +++++++++++++++- src/pymc_server/commands/exec_cli.py | 14 ++++++++++++++ src/pymc_server/commands/launch_cli.py | 1 - src/pymc_server/utils/yaml.py | 2 +- 6 files changed, 32 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 02b47d1..1394587 100644 --- a/.gitignore +++ b/.gitignore @@ -153,6 +153,7 @@ dmypy.json # Cython debug symbols cython_debug/ *pymc.yaml + # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore diff --git a/src/pymc_server/cli.py b/src/pymc_server/cli.py index ed00e4a..cfbf53f 100644 --- a/src/pymc_server/cli.py +++ b/src/pymc_server/cli.py @@ -39,7 +39,7 @@ def status(*args, **kwargs): #ctx.invoke(sky_status, *args, **kwargs) -@setup_launch_factory +@setup_exec_factory @usage_lib.entrypoint def exec(*args, **kwargs): """ calls the sky status command by passing the click context""" diff --git a/src/pymc_server/cli_factory.py b/src/pymc_server/cli_factory.py index 9ed1354..2403ee3 100644 --- a/src/pymc_server/cli_factory.py +++ b/src/pymc_server/cli_factory.py @@ -211,6 +211,18 @@ def setup_exec_factory(func): type=str, nargs=-1, **_get_shell_complete_args(_complete_file_name)), + click.option('--module-name', + '-m', + #default='pymc-marketing', + required=False, + type=str, + help=('Define the PyMC module / project you need tu use. ' + 'pymc-marketing is default.')), + click.option('--base-config-folder', + '-b', + required=False, + type=str, + help=('define config base Folder')), click.option( '--detach-run', '-d', @@ -221,7 +233,9 @@ def setup_exec_factory(func): _add_click_options(_TASK_OPTIONS_WITH_NAME + _EXTRA_RESOURCES_OPTIONS), usage_lib.entrypoint ] - + for option in reversed(options): + func = option(func) + return func def setup_start_factory(func): options = [ diff --git a/src/pymc_server/commands/exec_cli.py b/src/pymc_server/commands/exec_cli.py index af13476..69e55b2 100644 --- a/src/pymc_server/commands/exec_cli.py +++ b/src/pymc_server/commands/exec_cli.py @@ -3,11 +3,23 @@ import pymc_server import colorama from typing import Dict, List, Optional, Tuple, Union +from pymc_server.utils.yaml import ( + get_config_from_yaml, load_chain_dag_from_yaml, + _make_task_or_dag_from_entrypoint_with_overrides, + get_auto_stop + +) +from sky.utils import controller_utils +from sky.backends import backend_utils +from sky.cli import _parse_override_params, _merge_env_vars, _pop_and_ignore_fields_in_override_params +from sky import global_user_state def exec( cluster: Optional[str], cluster_option: Optional[str], entrypoint: Tuple[str, ...], + module_name:Optional[str], + base_config_folder:Optional[str], detach_run: bool, name: Optional[str], cloud: Optional[str], @@ -107,6 +119,8 @@ def exec( task_or_dag = _make_task_or_dag_from_entrypoint_with_overrides( entrypoint=entrypoint, + module_name=module_name, + base_config_folder=base_config_folder, name=name, workdir=workdir, cloud=cloud, diff --git a/src/pymc_server/commands/launch_cli.py b/src/pymc_server/commands/launch_cli.py index 0977ed6..3bcda35 100644 --- a/src/pymc_server/commands/launch_cli.py +++ b/src/pymc_server/commands/launch_cli.py @@ -148,7 +148,6 @@ def launch( #handle = cluster_records[0]['handle'] #head_ip = handle.external_ips()[0] - return cluster_name diff --git a/src/pymc_server/utils/yaml.py b/src/pymc_server/utils/yaml.py index e8b10aa..560cf7e 100644 --- a/src/pymc_server/utils/yaml.py +++ b/src/pymc_server/utils/yaml.py @@ -11,7 +11,7 @@ import sky from sky import clouds as sky_clouds -from sky.cli import _parse_override_params +from sky.cli import _parse_override_params, _pop_and_ignore_fields_in_override_params from sky.usage import usage_lib from .names import generate_cluster_name