Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not for merge] Status poller rearrangement patch stack #3295

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,8 @@ def _get_launch_command(self, block_id: str) -> str:
launch_cmd = self.launch_cmd.format(block_id=block_id)
return launch_cmd

def status(self) -> Dict[str, JobStatus]:
job_status = super().status()
def _regenerate_combined_status(self) -> Dict[str, JobStatus]:
job_status = super()._regenerate_combined_status()
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
Expand Down
64 changes: 37 additions & 27 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import BadStateException, ScalingFailed
from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler
from parsl.jobs.states import JobState, JobStatus
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.message_type import MessageType
from parsl.providers.base import ExecutionProvider
from parsl.utils import AtomicIDCounter
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self, *,

# errors can happen during the submit call to the provider; this is used
# to keep track of such errors so that they can be handled in one place
# together with errors reported by status()
# together with errors reported by _regenerate_combined_status()
self._simulated_status: Dict[str, JobStatus] = {}

# this stores an approximation (sometimes delayed) of the latest status
Expand Down Expand Up @@ -106,7 +106,7 @@ def status_polling_interval(self):
In practice, at least given the current situation, the executor uses a single task provider
and this method is a delegate to the corresponding method in the provider.

:return: the number of seconds to wait between calls to status() or zero if no polling
:return: the number of seconds to wait between calls to _regenerate_combined_status() or zero if no polling
should be done
"""
if self._provider is None:
Expand Down Expand Up @@ -167,30 +167,46 @@ def tasks(self) -> Dict[object, Future]:
def provider(self):
return self._provider

def _filter_scale_in_ids(self, to_kill, killed):
def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]:
""" Filter out job id's that were not killed
"""
assert len(to_kill) == len(killed)

if False in killed:
logger.warning(f"Some jobs were not killed successfully: to_kill list is {to_kill}, killed list is {killed}")

# Filters first iterable by bool values in second
return list(compress(to_kill, killed))

def _scale_out(self, blocks: int = 1) -> List[str]:
def scale_out_facade(self, n: int) -> List[str]:
"""Scales out the number of blocks by "blocks"
"""
if not self.provider:
raise ScalingFailed(self, "No execution provider available")
block_ids = []
logger.info(f"Scaling out by {blocks} blocks")
for _ in range(blocks):
monitoring_status_changes = {}
logger.info(f"Scaling out by {n} blocks")
for _ in range(n):
block_id = str(self._block_id_counter.get_id())
logger.info(f"Allocated block ID {block_id}")
try:
job_id = self._launch_block(block_id)

pending_status = JobStatus(JobState.PENDING)

self.blocks_to_job_id[block_id] = job_id
self.job_ids_to_block[job_id] = block_id
self._status[block_id] = pending_status

monitoring_status_changes[block_id] = pending_status
block_ids.append(block_id)

except Exception as ex:
self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
self._simulated_status[block_id] = failed_status
self._status[block_id] = failed_status

self.send_monitoring_info(monitoring_status_changes)
return block_ids

def scale_in(self, blocks: int) -> List[str]:
Expand All @@ -205,16 +221,20 @@ def scale_in(self, blocks: int) -> List[str]:

:return: A list of block ids corresponding to the blocks that were removed.
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks_to_job_id.keys())[:blocks]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

active_blocks = [block_id for block_id, status in self._status.items()
if status.state not in TERMINAL_STATES]

block_ids_to_kill = active_blocks[:blocks]

job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill]

# Cancel the blocks provisioned
if self.provider:
logger.info(f"Scaling in jobs: {kill_ids}")
r = self.provider.cancel(kill_ids)
job_ids = self._filter_scale_in_ids(kill_ids, r)
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
logger.info(f"Scaling in jobs: {job_ids_to_kill}")
r = self.provider.cancel(job_ids_to_kill)
job_ids = self._filter_scale_in_ids(job_ids_to_kill, r)
block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids]
return block_ids_killed
else:
logger.error("No execution provider available to scale in")
Expand Down Expand Up @@ -276,7 +296,7 @@ def poll_facade(self) -> None:
now = time.time()
if now >= self._last_poll_time + self.status_polling_interval:
previous_status = self._status
self._status = self.status()
self._status = self._regenerate_combined_status()
self._last_poll_time = now
delta_status = {}
for block_id in self._status:
Expand All @@ -287,7 +307,7 @@ def poll_facade(self) -> None:
if delta_status:
self.send_monitoring_info(delta_status)

def status(self) -> Dict[str, JobStatus]:
def _regenerate_combined_status(self) -> Dict[str, JobStatus]:
"""Return the status of all jobs/blocks currently known to this executor.

:return: a dictionary mapping block ids (in string) to job status
Expand Down Expand Up @@ -327,13 +347,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
del self._status[block_id]
self.send_monitoring_info(new_status)
return block_ids

def scale_out_facade(self, n: int) -> List[str]:
block_ids = self._scale_out(n)
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.PENDING)
self.send_monitoring_info(new_status)
self._status.update(new_status)
return block_ids
71 changes: 71 additions & 0 deletions parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging

import pytest

import parsl
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.executors.errors import BadStateException
from parsl.jobs.states import JobState, JobStatus
from parsl.providers import LocalProvider


class FailingProvider(LocalProvider):
def submit(*args, **kwargs):
raise RuntimeError("Deliberate failure of provider.submit")


def local_config():
"""Config to simulate failing blocks without connecting"""
return Config(
executors=[
HighThroughputExecutor(
label="HTEX",
heartbeat_period=1,
heartbeat_threshold=2,
poll_period=100,
max_workers_per_node=1,
provider=FailingProvider(
init_blocks=0,
max_blocks=2,
min_blocks=0,
),
)
],
max_idletime=0.5,
strategy='htex_auto_scale',
strategy_period=0.1
# this strategy period needs to be a few times smaller than the
# status_polling_interval of FailingProvider, which is 5s at
# time of writing
)


@parsl.python_app
def double(x):
return x * 2


@pytest.mark.local
def test_disconnected_blocks():
"""Test reporting of blocks that fail to connect from HTEX"""
dfk = parsl.dfk()
executor = dfk.executors["HTEX"]

connected_blocks = executor.connected_blocks()
assert not connected_blocks, "Expected 0 blocks"

future = double(5)
with pytest.raises(BadStateException):
future.result()

assert isinstance(future.exception(), BadStateException)

status_dict = executor.status()
assert len(status_dict) == 1, "Expected exactly 1 block"
for status in status_dict.values():
assert isinstance(status, JobStatus)
assert status.state == JobState.MISSING

connected_blocks = executor.connected_blocks()
assert connected_blocks == [], "Expected exactly 0 connected blocks"
Loading