From f1b33b493c16fbf4673b1727118abb2343571edc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 14:49:59 +0000 Subject: [PATCH 01/12] fix 3471 - only scale in blocks that are in _status and non-terminal block ID and job ID mappings contain the full historical list of blocks, but prior to this PR, the mapping was used as source of current jobs that should be scaled in --- parsl/executors/status_handling.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 13ddef1256..d02df18b49 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -205,16 +205,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") From e42423d26c81da76ced66474d82dfbf43a4028da Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 27 Jul 2024 13:01:50 +0000 Subject: [PATCH 02/12] in default exeutor do we need to check for provider or can we assert that it exists? specifically raised by khk in the context of def scale_in From 686445ec2ca58787a346d80b298486a16ba5962d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 22 Jul 2024 10:53:43 +0000 Subject: [PATCH 03/12] scaling code in strategy.py uses a more abstract notion of "scaling units" that only make sense in proportional to other scaling load amounts (i.e. ratios) - htex uses "tasks" as the unit. wq now uses "cores" as the unit. variables and text inside strategy.py should explain this. variables and docstrings should be clearer about this. From 201bc037e1ecc861b58f9ebaa55c5a166699f832 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 10:12:39 +0000 Subject: [PATCH 04/12] foo --- parsl/executors/status_handling.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index d02df18b49..5d47fbfee3 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -167,10 +167,14 @@ def tasks(self) -> Dict[object, Future]: def provider(self): return self._provider - def _filter_scale_in_ids(self, to_kill, killed): + def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]: """ Filter out job id's that were not killed """ assert len(to_kill) == len(killed) + + if False in killed: + logger.warning(f"Some jobs were not killed successfully: to_kill list is {to_kill}, killed list is {killed}") + # Filters first iterable by bool values in second return list(compress(to_kill, killed)) From 29703131cbcaac880667de45b49330db73b71f0c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 11:14:54 +0000 Subject: [PATCH 05/12] move two scale out methods next to each other for easier visual inspection behaviour changes: none --- parsl/executors/status_handling.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5d47fbfee3..3f3e504635 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -178,6 +178,16 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - # Filters first iterable by bool values in second return list(compress(to_kill, killed)) + def scale_out_facade(self, n: int) -> List[str]: + block_ids = self._scale_out(n) + if block_ids is not None: + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) + return block_ids + def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ @@ -335,13 +345,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ del self._status[block_id] self.send_monitoring_info(new_status) return block_ids - - def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids From 29e6c4d930c36735668cac2aa79948889d5b048b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 11:25:49 +0000 Subject: [PATCH 06/12] remove unused None codepath --- parsl/executors/status_handling.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 3f3e504635..4262d28655 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -180,12 +180,11 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - def scale_out_facade(self, n: int) -> List[str]: block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def _scale_out(self, blocks: int = 1) -> List[str]: From 978f4336abf41d995b29f7b237b0b00d398029ec Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 11:27:01 +0000 Subject: [PATCH 07/12] Inline scale_out changes: none --- parsl/executors/status_handling.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 4262d28655..335ddebc8d 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -179,22 +179,13 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - return list(compress(to_kill, killed)) def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids - - def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: @@ -204,6 +195,12 @@ def _scale_out(self, blocks: int = 1) -> List[str]: block_ids.append(block_id) except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def scale_in(self, blocks: int) -> List[str]: From ab3a5bd5285c93bc9fe1bb9b733f181f498cc764 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 11:32:13 +0000 Subject: [PATCH 08/12] move status table updates for various tables together in scale out TODO: this reveals a possible bug here that FAILED entries in simulated status are not immedaitely sent, but instead only get sent at the next poller update? unless submitted entries which are sent immediately? that should be an easy fix after this PR, though... --- parsl/executors/status_handling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 335ddebc8d..d075dbed27 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -184,23 +184,27 @@ def scale_out_facade(self, n: int) -> List[str]: if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] + monitoring_status_changes = {} logger.info(f"Scaling out by {n} blocks") for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + self.send_monitoring_info(monitoring_status_changes) return block_ids def scale_in(self, blocks: int) -> List[str]: From 222555d94813380b8b5981318ace5779e33c3a51 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 25 Mar 2024 17:13:57 +0000 Subject: [PATCH 09/12] test dev for #3235 --- ...st_disconnected_blocks_failing_provider.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks" From 7989d54f818b8a00df6cca348a459852ddc09250 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 14:46:54 +0000 Subject: [PATCH 10/12] the job submission PENDING status update has this principle so that scaling strategy will see submitted jobs immediately, before a provider status refresh happens. this makes the scaling code immediately aware of what just happened, rather than for one poll period acting as if nothing had happened. When making changes that will later be reflected in the _status table, then those changes should be immediately also be made in the cached _status table eplciitly. before this PR, this code path does not happen in the case of a failed submission, where a failure status will appear when a refresh happens, but not before. in that case the scaling code will act as if the failed submission did not happen and will continue to submit repeatedly until a refresh happens much later. this PR makes _status be updated in this case too. fixes 3235 --- parsl/executors/status_handling.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index d075dbed27..3ad0494829 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -202,7 +202,9 @@ def scale_out_facade(self, n: int) -> List[str]: block_ids.append(block_id) except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status self.send_monitoring_info(monitoring_status_changes) return block_ids From 114f373a519c39a217da81de2eeb316127d3f965 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 15:14:43 +0000 Subject: [PATCH 11/12] rename status as an authoritative status regenerator it can be subclassed to add in executor-specific status (as happens with htex) --- parsl/executors/high_throughput/executor.py | 4 ++-- parsl/executors/status_handling.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 69183364f7..9b4a0b0195 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -787,8 +787,8 @@ def _get_launch_command(self, block_id: str) -> str: launch_cmd = self.launch_cmd.format(block_id=block_id) return launch_cmd - def status(self) -> Dict[str, JobStatus]: - job_status = super().status() + def _regenerate_combined_status(self) -> Dict[str, JobStatus]: + job_status = super()._regenerate_combined_status() connected_blocks = self.connected_blocks() for job_id in job_status: job_info = job_status[job_id] diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 3ad0494829..1c27fbcfbd 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -76,7 +76,7 @@ def __init__(self, *, # errors can happen during the submit call to the provider; this is used # to keep track of such errors so that they can be handled in one place - # together with errors reported by status() + # together with errors reported by _regenerate_combined_status() self._simulated_status: Dict[str, JobStatus] = {} # this stores an approximation (sometimes delayed) of the latest status @@ -106,7 +106,7 @@ def status_polling_interval(self): In practice, at least given the current situation, the executor uses a single task provider and this method is a delegate to the corresponding method in the provider. - :return: the number of seconds to wait between calls to status() or zero if no polling + :return: the number of seconds to wait between calls to _regenerate_combined_status() or zero if no polling should be done """ if self._provider is None: @@ -296,7 +296,7 @@ def poll_facade(self) -> None: now = time.time() if now >= self._last_poll_time + self.status_polling_interval: previous_status = self._status - self._status = self.status() + self._status = self._regenerate_combined_status() self._last_poll_time = now delta_status = {} for block_id in self._status: @@ -307,7 +307,7 @@ def poll_facade(self) -> None: if delta_status: self.send_monitoring_info(delta_status) - def status(self) -> Dict[str, JobStatus]: + def _regenerate_combined_status(self) -> Dict[str, JobStatus]: """Return the status of all jobs/blocks currently known to this executor. :return: a dictionary mapping block ids (in string) to job status From 92b0c464457e1fb27a48e57054d297586cc30bb7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 16 Jul 2024 14:10:57 +0000 Subject: [PATCH 12/12] inline some stuff