Skip to content

Remove crash from AIPM in torchX at end #1042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,10 +1109,6 @@ def _cancel_existing(self, app_id: str) -> None:
local_app.state = AppState.CANCELLED

def close(self) -> None:
# terminate all apps
for app_id, app in self._apps.items():
log.debug(f"Terminating app: {app_id}")
app.kill()
# delete logdir if torchx created a log dir
if self._base_log_dir and self._created_tmp_log_dir:
shutil.rmtree(self._base_log_dir, ignore_errors=True)
Expand Down
27 changes: 0 additions & 27 deletions torchx/schedulers/test/local_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,33 +1112,6 @@ def test_get_cuda_devices_not_set(self, _: MagicMock) -> None:
self.assertFalse(ENV_CUDA_VISIBLE_DEVICES in role_params[2].env)
self.assertFalse(ENV_CUDA_VISIBLE_DEVICES in role_params[3].env)

def test_no_orphan_process_function(self) -> None:
self._test_orphan_workflow()

def _test_orphan_workflow(self) -> None:
mp_queue = mp.Queue()
child_nproc = 2

proc = mp.Process(
target=start_sleep_processes, args=(self.test_dir, mp_queue, child_nproc)
)
proc.start()
total_processes = child_nproc + 1
pids = []
for _ in range(total_processes):
pids.append(mp_queue.get(timeout=5))
parent_pid = pids[0]
child_pids = pids[1:]

os.kill(parent_pid, signal.SIGTERM)
# Wait to give time for signal handlers to finish work
time.sleep(5)
for child_pid in child_pids:
# Killing parent should kill all children, we expect that each call to
# os.kill would raise OSError
with self.assertRaises(OSError):
os.kill(child_pid, 0)


class JoinPATHTest(unittest.TestCase):
def test_join_PATH(self) -> None:
Expand Down
Loading