Skip to content

Commit ec8dd62

Browse files
authored
Refactor naive scale in behaviour for Work Queue and Task Vine (#3526)
The intended behaviour of this scale in code, which is only for scaling in all blocks (for example at the end of a workflow) makes sense as a default for all BlockProviderExecutors. This PR makes that refactor. This code is buggy (before and after) - see issue #3471. This PR does not attempt to fix that, but moves code into a better place for bugfixing, and a subsequent PR will fix it.
1 parent a2af30c commit ec8dd62

File tree

3 files changed

+19
-38
lines changed

3 files changed

+19
-38
lines changed

parsl/executors/status_handling.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,32 @@ def _scale_out(self, blocks: int = 1) -> List[str]:
193193
self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
194194
return block_ids
195195

196-
@abstractmethod
197196
def scale_in(self, blocks: int) -> List[str]:
198197
"""Scale in method.
199198
200199
Cause the executor to reduce the number of blocks by count.
201200
201+
The default implementation will kill blocks without regard to their
202+
status or whether they are executing tasks. Executors with more
203+
nuanced scaling strategies might overload this method to work with
204+
that strategy - see the HighThroughputExecutor for an example of that.
205+
202206
:return: A list of block ids corresponding to the blocks that were removed.
203207
"""
204-
pass
208+
# Obtain list of blocks to kill
209+
to_kill = list(self.blocks_to_job_id.keys())[:blocks]
210+
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]
211+
212+
# Cancel the blocks provisioned
213+
if self.provider:
214+
logger.info(f"Scaling in jobs: {kill_ids}")
215+
r = self.provider.cancel(kill_ids)
216+
job_ids = self._filter_scale_in_ids(kill_ids, r)
217+
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
218+
return block_ids_killed
219+
else:
220+
logger.error("No execution provider available to scale in")
221+
return []
205222

206223
def _launch_block(self, block_id: str) -> Any:
207224
launch_cmd = self._get_launch_command(block_id)

parsl/executors/taskvine/executor.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -573,24 +573,6 @@ def outstanding(self) -> int:
573573
def workers_per_node(self) -> Union[int, float]:
574574
return 1
575575

576-
def scale_in(self, count: int) -> List[str]:
577-
"""Scale in method. Cancel a given number of blocks
578-
"""
579-
# Obtain list of blocks to kill
580-
to_kill = list(self.blocks_to_job_id.keys())[:count]
581-
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]
582-
583-
# Cancel the blocks provisioned
584-
if self.provider:
585-
logger.info(f"Scaling in jobs: {kill_ids}")
586-
r = self.provider.cancel(kill_ids)
587-
job_ids = self._filter_scale_in_ids(kill_ids, r)
588-
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
589-
return block_ids_killed
590-
else:
591-
logger.error("No execution provider available to scale")
592-
return []
593-
594576
def shutdown(self, *args, **kwargs):
595577
"""Shutdown the executor. Sets flag to cancel the submit process and
596578
collector thread, which shuts down the TaskVine system submission.

parsl/executors/workqueue/executor.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -689,24 +689,6 @@ def outstanding(self) -> int:
689689
def workers_per_node(self) -> Union[int, float]:
690690
return self.scaling_cores_per_worker
691691

692-
def scale_in(self, count: int) -> List[str]:
693-
"""Scale in method.
694-
"""
695-
# Obtain list of blocks to kill
696-
to_kill = list(self.blocks_to_job_id.keys())[:count]
697-
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]
698-
699-
# Cancel the blocks provisioned
700-
if self.provider:
701-
logger.info(f"Scaling in jobs: {kill_ids}")
702-
r = self.provider.cancel(kill_ids)
703-
job_ids = self._filter_scale_in_ids(kill_ids, r)
704-
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
705-
return block_ids_killed
706-
else:
707-
logger.error("No execution provider available to scale in")
708-
return []
709-
710692
def shutdown(self, *args, **kwargs):
711693
"""Shutdown the executor. Sets flag to cancel the submit process and
712694
collector thread, which shuts down the Work Queue system submission.

0 commit comments

Comments
 (0)