From d1008ac4b1ec5beec612359f589b77c7032751ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20K=C3=BCchler?= Date: Thu, 7 Oct 2021 13:06:10 +0200 Subject: [PATCH] bug fixes + formatting + example designs --- experiment-suite.yml | 23 +++---- experiments/designs/example-minimal.yml | 9 +-- experiments/designs/example-multi.yml | 65 +++++++++---------- roles/experiment-job/library/tsp.py | 61 ++++++++--------- roles/experiment-job/tasks/main.yml | 22 ++++++- roles/setup-server/tasks/main.yml | 1 + roles/suite-load-post-aws/tasks/main.yml | 4 ++ .../action_plugins/suite_design_validate.py | 48 ++++++-------- roles/suite-load-pre-aws/tasks/main.yml | 2 +- 9 files changed, 127 insertions(+), 108 deletions(-) diff --git a/experiment-suite.yml b/experiment-suite.yml index 7c434ed2..05f3d5b4 100644 --- a/experiment-suite.yml +++ b/experiment-suite.yml @@ -16,6 +16,7 @@ vars: prj_clear: False # controls whether to also filter for the suite or not (for a clear we want to include all instances of the project independent of the suite) + # TODO [nku] a design file should be able to contain an experiment design in table for in list form (could use info on whether list of factor levels is defined) - name: resolve suite_id, load and validate suite design, fill default values, and prepare variables include_role: name: suite-load-pre-aws @@ -35,7 +36,7 @@ - name: Setup AWS VPC include_role: name: suite-aws-vpc-create - + - name: Setup AWS EC2 instances include_role: name: suite-aws-ec2-create @@ -53,8 +54,9 @@ tags: [print_action] -- name: Setup registered host types (part 2) -> apply +- name: Setup registered host types (part 2) -> apply hosts: all # intersection of hosts in project and in suite (via aws_ec2 dynamic inventory plugin) + # TODO [nku] when running multiple suites after each other but one of them is not finsihed, then this here can be problematic because all also includes hosts from other suites strategy: free tasks: @@ -92,19 +94,20 @@ loop: "{{ range(0, (exp_job_ids_unfinished | length), 1) | list }}" loop_control: loop_var: unfinished_job_idx - + rescue: + - name: handle unexpected error fail: msg: unexpected error occured in experiment = {{ exp_name }} - when: tsp_result is undefined or not tsp_result.failed + when: is_expected_error is not defined or not is_expected_error # the loop until task in `experiment-job` throws an error if the number of tries are exceeded. # here we catch this error and handle this gracefully. (every other error is handled by the previous task) - name: handle expected error if number of tries exceeded ansible.builtin.debug: msg: number of tries exceeded -> experiment = {{ exp_name }} - + # when: is_expected_error ########################################################################## @@ -116,7 +119,7 @@ tasks: - name: compute overview of progress in different experiments in suite - set_fact: + set_fact: suite_progress_info: "{{ suite_progress_info | default({}) | combine({my_exp_name: {'require_suite_to_finish': my_require_suite ,'n_finished': my_n_finished | int, 'n_unfinished': my_n_unfinished | int, 'progress': (my_n_finished | int / (my_n_finished | int + my_n_unfinished | int) * 100) | round(2) | string + ' %' }})}}" vars: my_exp_name: "{{ hostvars[my_controller_host].exp_name }}" @@ -126,7 +129,7 @@ loop: "{{ groups['is_controller_yes'] }}" loop_control: loop_var: my_controller_host - + - set_fact: is_suite_finished: "{{ suite_progress_info | json_query('*.n_unfinished') | sum == 0 }}" @@ -142,7 +145,7 @@ - name: Remove AWS VPC include_role: name: suite-aws-vpc-delete - + # cleanup aws if suite is finished (unless the explicit flag is set to keep the aws environment) when: awsclean | default(true) | bool and is_suite_finished @@ -150,11 +153,9 @@ debug: msg: "suite={{ suite }} suite_id={{ suite_id }} finished={{ is_suite_finished }}" tags: [print_action] - + - name: output progress information of experiments debug: msg: "{{ suite_progress_info[item] }}" loop: "{{ suite_progress_info.keys() | sort }}" tags: [print_action] - - diff --git a/experiments/designs/example-minimal.yml b/experiments/designs/example-minimal.yml index 8c04fd9c..dd8f2d28 100644 --- a/experiments/designs/example-minimal.yml +++ b/experiments/designs/example-minimal.yml @@ -1,12 +1,10 @@ --- -minimal: # name of the experiment +minimal: # name of the experiment n_repetitions: 1 # do not run repetitions - common_roles: - - setup-common # use role to install hosts host_types: - small: # use one instance - n: 1 + small: # use one instance + n: 1 $CMD$: "echo [% my_run.arg1 %] [% my_run.arg2 %]" # start experiment run # configuration base_experiment: @@ -15,4 +13,3 @@ minimal: # name of the experiment factor_levels: - arg2: world - arg2: universe - diff --git a/experiments/designs/example-multi.yml b/experiments/designs/example-multi.yml index f9bf22c2..158a672d 100644 --- a/experiments/designs/example-multi.yml +++ b/experiments/designs/example-multi.yml @@ -3,49 +3,48 @@ # Example for one client server experiment exp_client_server: + # each client sends the server a message n_repetitions: 3 - common_roles: - - setup-common + #common_roles: + # - setup-common host_types: client: n: 2 check_status: True init_role: setup-client $CMD$: - # could use netcat echo server: https://nmap.org/ncat/guide/ncat-simple-services.html https://ubidots.com/blog/how-to-simulate-a-tcpudp-client-using-netcat/ - - echo c1 # TODO [nku] replace with actual - - echo c2 # TODO [nku] need to feed ip address + # send messages to the server with nc + # use exp_host_lst variable to extract ip address from server, the delay `sleep 5` ensures that the server is running when the client sends the message + - sleep 5 && echo '[% my_run.host_vars.client.msg %] from client 1 ([% my_run.info %])' | netcat -q 1 [% exp_host_lst | json_query("[?host_type=='server'].private_dns_name") | first %] [% my_run.port %] + # have two commands to distinguish message from client 1 and 2 + - sleep 5 && echo '[% my_run.host_vars.client.msg %] from client 2 ([% my_run.info %])' | netcat -q 1 [% exp_host_lst | json_query("[?host_type=='server'].private_dns_name") | first %] [% my_run.port %] + server: n: 1 check_status: False - $CMD$: echo server # TODO [nku] replace with actual - - base_experiment: - info: $FACTOR$ + init_role: setup-server + # run a single ncat server -> writes all incoming messages to stdout + $CMD$: ncat -l [% my_run.port %] --keep-open + + base_experiment: + port: 2807 + info: $FACTOR$ + host_vars: + client: + msg: hello server + server: + greeting: $FACTOR$ + factor_levels: + - info: run 0 + host_vars: + server: + greeting: hello client + - info: run 1 + host_vars: + server: + greeting: hi client + - info: run 2 host_vars: - client: - msg: hello - n_msg: $FACTOR$ server: - delay: $FACTOR$ - - factor_levels: - - info: run 1 - base - host_vars: - client: - n_msg: 5 - server: - delay: 10 - - info: run 2 - more delay - host_vars: - client: - n_msg: 5 - server: - delay: 20 - - info: run 3 - more msgs - host_vars: - client: - n_msg: 10 - server: - delay: 10 \ No newline at end of file + greeting: good day client \ No newline at end of file diff --git a/roles/experiment-job/library/tsp.py b/roles/experiment-job/library/tsp.py index 3aa5fffc..e0000863 100644 --- a/roles/experiment-job/library/tsp.py +++ b/roles/experiment-job/library/tsp.py @@ -3,8 +3,7 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type - -import subprocess, os, signal, re +import subprocess, os, signal, re, time DOCUMENTATION = r''' @@ -31,7 +30,7 @@ required: false type: bool cmd: - description: The command to add at the end of the task spooler queue. + description: The command to add at the end of the task spooler queue. required: false type: str cmd_label: @@ -109,7 +108,7 @@ def get_tasks(return_pid): cmd = m.group(7) if cmd is None: cmd = m.group(8) # has no label => cmd is in group 7 - + d = { "id": m.group(1), "state": m.group(2), @@ -168,8 +167,8 @@ def run_module(): if module.check_mode: module.exit_json(**result) - - # TSP LOGIC: + + # TSP LOGIC: changed = False # we start with changed false if module.params["clear_tasks"]: @@ -210,7 +209,7 @@ def run_module(): changed = True - # remove task from task spooler queue + # remove task from task spooler queue if module.params['remove_task_id']: subprocess.run(["tsp", "-r", module.params['remove_task_id']], capture_output=True, text=True) @@ -224,26 +223,24 @@ def run_module(): subprocess.run(["tsp", "-r", task["id"]], capture_output=True, text=True) changed = True - # mark whether module changed result['changed'] = changed - + # in the event of a successful module execution, you will want to # simple AnsibleModule.exit_json(), passing the key/value results module.exit_json(**result) def clear_task_spooler(): - + # clear all done subprocess.run(["tsp", "-C"], capture_output=True, check=True) - + tasks = get_tasks(return_pid=True) if len(tasks) == 0: return False # changed - - + # make single slot subprocess.run(["tsp", "-S", "1"], capture_output=True, check=True) @@ -253,32 +250,38 @@ def clear_task_spooler(): # make the dummy task urgent (next in line) subprocess.run(["tsp", "-u"], capture_output=True, check=True) - for task in tasks: if task["state"] == "running": # kill the running processes - os.kill(int(task['pid']), signal.SIGTERM) + os.killpg(int(task['pid']), signal.SIGTERM) # now the dummy task is running - tasks = get_tasks(return_pid=True) - dummy_task_pid = None + def get_dummy_task_pid(): + tasks = get_tasks(return_pid=True) + dummy_task_pid = None + for task in tasks: + if task["state"] == "running": + if task["label"] != "DUMMY" or dummy_task_pid is not None: + raise ValueError(f"unexpected running task (only single dummy task should run): {task}") + dummy_task_pid = task["pid"] + + if dummy_task_pid is None: + raise ValueError("running dummy task not found") + + return dummy_task_pid + + try: + dummy_task_pid = get_dummy_task_pid() + except ValueError as e: + time.sleep(3) # add some slack for process to finish + # try again + dummy_task_pid = get_dummy_task_pid() - - for task in tasks: - if task["state"] == "running": - if task["label"] != "DUMMY" or dummy_task_pid is not None: - raise ValueError(f"unexpected running task (only single dummy task should run): {task}") - - dummy_task_pid = task["pid"] - - if dummy_task_pid is None: - raise ValueError("running dummy task not found") - # clear the task spooler (remove all jobs in queue) subprocess.run(["tsp", "-K"], capture_output=True, check=True) # finally also kill the dummy task by pid - os.kill(int(dummy_task_pid) , signal.SIGTERM) + os.killpg(int(dummy_task_pid) , signal.SIGTERM) return True # changed diff --git a/roles/experiment-job/tasks/main.yml b/roles/experiment-job/tasks/main.yml index 22957c51..72583340 100644 --- a/roles/experiment-job/tasks/main.yml +++ b/roles/experiment-job/tasks/main.yml @@ -28,7 +28,21 @@ - job_check_wait_time is defined - exp_runs_ext is defined - exp_host_lst is defined # list of hosts involved in experiment - #[{"host_type": x, "exp_host_type_idx": x, "exp_host_type_n": x, "is_controller": x, "public_dns_name": x, "private_ip_address": x}] + #[{"host_type": x, "exp_host_type_idx": x, "exp_host_type_n": x, "is_controller": x, "public_dns_name": x, "private_dns_name": x}] + +#- debug: +# var: exp_host_lst +# +#- debug: +# msg=' ip= {{ exp_host_lst | json_query("[?host_type=='server'].private_dns_name") | first }} ' + + +- name: mark that from here up to the status check, no error is expected + set_fact: + # in the calling playbook we have a rescue block that should handle the error when the number of retries is exceeded. + # -> however, all other errors should be thrown and not rescued -> this info variable helps to detect where the error occurred in the role + is_expected_error: False + ################################################################### # Start: Schedule new Jobs (enqueue task in task spooler) # @@ -159,6 +173,9 @@ expstate: save +- name: mark that in the next task, there could be an expected error (number of retires exceeded) + set_fact: + is_expected_error: True - name: Get status of job # Note: if the number of tries are exceeded, the task raises an error which stops this role and is caught in the parent @@ -172,6 +189,9 @@ loop_control: loop_var: host +- name: mark that from here on, any error is unexpected + set_fact: + is_expected_error: False ################################################################### # Download Results for newly finished job # diff --git a/roles/setup-server/tasks/main.yml b/roles/setup-server/tasks/main.yml index 0b5d3204..c6504cf8 100644 --- a/roles/setup-server/tasks/main.yml +++ b/roles/setup-server/tasks/main.yml @@ -10,5 +10,6 @@ become: True apt: pkg: + - ncat # TODO: add other required packages (only for server) update_cache: yes diff --git a/roles/suite-load-post-aws/tasks/main.yml b/roles/suite-load-post-aws/tasks/main.yml index e0467fe3..e91fd46d 100644 --- a/roles/suite-load-post-aws/tasks/main.yml +++ b/roles/suite-load-post-aws/tasks/main.yml @@ -57,6 +57,10 @@ # run_once: True -> would like to do this but for strategy free not supported and it's not a relevant overhead +- debug: + msg="my_suite_design_ext= {{ my_suite_design_ext }}" + + ########################################## # Setup Job Ids ########################################## diff --git a/roles/suite-load-pre-aws/action_plugins/suite_design_validate.py b/roles/suite-load-pre-aws/action_plugins/suite_design_validate.py index 8a2ca23d..48a1a7e7 100644 --- a/roles/suite-load-pre-aws/action_plugins/suite_design_validate.py +++ b/roles/suite-load-pre-aws/action_plugins/suite_design_validate.py @@ -67,7 +67,7 @@ def run(self, tmp=None, task_vars=None): module_args = self._task.args src = module_args["src"] dest = module_args["dest"] - + prj_id = task_vars["prj_id"] suite = os.path.splitext(os.path.basename(src))[0] @@ -77,12 +77,12 @@ def run(self, tmp=None, task_vars=None): design_raw = yaml.load(f, Loader=UniqueKeyLoader) self._validate_and_default_suite(prj_id=prj_id, suite=suite, design_raw=design_raw) - + with open(dest, 'w+') as f: yaml.dump(design_raw, f) except AssertionError as e: - + raise ValueError("duplicate keys (experiment names)") @@ -90,7 +90,7 @@ def run(self, tmp=None, task_vars=None): def _validate_and_default_suite(self, prj_id, suite, design_raw): - + exp_names = list(design_raw.keys()) host_type_names = [] @@ -105,12 +105,12 @@ def _validate_and_default_suite(self, prj_id, suite, design_raw): expected_unique = [prj_id, suite] + exp_names + host_type_names + keywords_all if len(set(expected_unique)) != len(expected_unique): - raise ValueError(f"found duplicates in identifiers -> adjust prj_id, suite, host_type, or exp_name to avoid them (identifiers={{ expected_unique }})") - + raise ValueError(f"found duplicates in identifiers -> adjust prj_id, suite, host_type, or exp_name to avoid them (identifiers={ expected_unique })") + # check length limit of project id and suite (tag on aws has a limit) if len(prj_id) > 200: raise ValueError("project id too long") - + if len(suite) > 200: raise ValueError("suite name too long") @@ -120,7 +120,7 @@ def _validate_and_default_suite(self, prj_id, suite, design_raw): for exp_name, exp_raw in design_raw.items(): - self._validate_and_default_experiment(exp_raw) + self._validate_and_default_experiment(exp_raw) return True @@ -143,13 +143,13 @@ def _validate_and_default_experiment(self, exp_raw): if "common_roles" not in exp_raw: exp_raw["common_roles"] = [] # TODO [nku] could check that the common roles actually exists -> need to know the folder - + if "factor_levels" not in exp_raw: exp_raw["factor_levels"] = [{}] - + # handle host_types for host_type_name, host_type_raw in exp_raw["host_types"].items(): - self._validate_and_default_host_type(host_type_raw) + self._validate_and_default_host_type(host_type_raw) # check base_experiment expected_factor_paths = self._validate_base_experiment(exp_raw["base_experiment"]) @@ -174,10 +174,10 @@ def _validate_and_default_host_type(self, host_type_raw): raise ValueError("$CMD$ must be in host_type") ############# - # Set check_status: True if not set + # Set check_status: True if not set if "check_status" not in host_type_raw: host_type_raw["check_status"] = True - + ############# # set n by default to 1 if "n" not in host_type_raw: @@ -186,7 +186,7 @@ def _validate_and_default_host_type(self, host_type_raw): ############# # set init_role by default to empty list if "init_role" not in host_type_raw: - host_type_raw["init_role"] = [] + host_type_raw["init_role"] = [] ############# # convert init role to list @@ -197,16 +197,16 @@ def _validate_and_default_host_type(self, host_type_raw): raise ValueError("init_role must be a list") ############# - # Convert $CMD$ to default structure + # Convert $CMD$ to default structure if not isinstance(host_type_raw["$CMD$"], list): - # repeat the same cmd for all `n` hosts of this type + # repeat the same cmd for all `n` hosts of this type host_type_raw["$CMD$"] = [host_type_raw["$CMD$"]] * host_type_raw["n"] - + if len(host_type_raw["$CMD$"]) != host_type_raw["n"]: raise ValueError("cmd list length does not match the number of instances `n` of host type") - + # host_type_raw["$CMD$"] is a list of length n cmds = [] for cmd in host_type_raw["$CMD$"]: @@ -221,13 +221,13 @@ def _validate_and_default_host_type(self, host_type_raw): host_type_raw["$CMD$"] = cmds # host_type_raw["$CMD$"] is a list of length n, each element is a dict that contains at least one entry with key "main" - + """ # minimal example n: 1 $CMD$: - main: X - + # two instances, one command n: 2 $CMD$: @@ -258,16 +258,10 @@ def _validate_factor_levels(self, factor_levels_raw, expected_factors): raise ValueError("factor levels must be a list") for run in factor_levels_raw: - + actual_factors = [] for path, value in nested_dict_iter(run): actual_factors.append(path) if sorted(expected_factors) != sorted(actual_factors): raise ValueError(f"expected factors do not match actual factors: expected={expected_factors} actual={actual_factors}") - - - - - - \ No newline at end of file diff --git a/roles/suite-load-pre-aws/tasks/main.yml b/roles/suite-load-pre-aws/tasks/main.yml index 1acb61ba..6b0b4f93 100644 --- a/roles/suite-load-pre-aws/tasks/main.yml +++ b/roles/suite-load-pre-aws/tasks/main.yml @@ -81,7 +81,7 @@ - name: validate the suite design and set default values suite_design_validate: - src: experiments/designs/example.yml + src: experiments/designs/{{ suite }}.yml dest: "{{ suite_design_dir }}/suite_design.yml" when: id == 'new' # only validate and resolve defaults for new (otherwise load the old already checked version)