diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 69183364f7..9b4a0b0195 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -787,8 +787,8 @@ def _get_launch_command(self, block_id: str) -> str: launch_cmd = self.launch_cmd.format(block_id=block_id) return launch_cmd - def status(self) -> Dict[str, JobStatus]: - job_status = super().status() + def _regenerate_combined_status(self) -> Dict[str, JobStatus]: + job_status = super()._regenerate_combined_status() connected_blocks = self.connected_blocks() for job_id in job_status: job_info = job_status[job_id] diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 13ddef1256..1c27fbcfbd 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -76,7 +76,7 @@ def __init__(self, *, # errors can happen during the submit call to the provider; this is used # to keep track of such errors so that they can be handled in one place - # together with errors reported by status() + # together with errors reported by _regenerate_combined_status() self._simulated_status: Dict[str, JobStatus] = {} # this stores an approximation (sometimes delayed) of the latest status @@ -106,7 +106,7 @@ def status_polling_interval(self): In practice, at least given the current situation, the executor uses a single task provider and this method is a delegate to the corresponding method in the provider. - :return: the number of seconds to wait between calls to status() or zero if no polling + :return: the number of seconds to wait between calls to _regenerate_combined_status() or zero if no polling should be done """ if self._provider is None: @@ -167,30 +167,46 @@ def tasks(self) -> Dict[object, Future]: def provider(self): return self._provider - def _filter_scale_in_ids(self, to_kill, killed): + def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]: """ Filter out job id's that were not killed """ assert len(to_kill) == len(killed) + + if False in killed: + logger.warning(f"Some jobs were not killed successfully: to_kill list is {to_kill}, killed list is {killed}") + # Filters first iterable by bool values in second return list(compress(to_kill, killed)) - def _scale_out(self, blocks: int = 1) -> List[str]: + def scale_out_facade(self, n: int) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + monitoring_status_changes = {} + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status + + self.send_monitoring_info(monitoring_status_changes) return block_ids def scale_in(self, blocks: int) -> List[str]: @@ -205,16 +221,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") @@ -276,7 +296,7 @@ def poll_facade(self) -> None: now = time.time() if now >= self._last_poll_time + self.status_polling_interval: previous_status = self._status - self._status = self.status() + self._status = self._regenerate_combined_status() self._last_poll_time = now delta_status = {} for block_id in self._status: @@ -287,7 +307,7 @@ def poll_facade(self) -> None: if delta_status: self.send_monitoring_info(delta_status) - def status(self) -> Dict[str, JobStatus]: + def _regenerate_combined_status(self) -> Dict[str, JobStatus]: """Return the status of all jobs/blocks currently known to this executor. :return: a dictionary mapping block ids (in string) to job status @@ -327,13 +347,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ del self._status[block_id] self.send_monitoring_info(new_status) return block_ids - - def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks"