From b97900e2d71c44eb533285bc279e5696a5532a1b Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:42:44 +0200 Subject: [PATCH 01/10] BatchSpawnerBase: Add background_tasks, connect_to_job feature. This adds the possibility to start a "connect_to_job" background task on the hub on job start, which establishes connectivity to the actual single user server. An example for this can be "condor_ssh_to_job" for HTCondor batch systems. Additionally, the background tasks are monitored: - for successful startup. The background task is given some time to successfully establish connectivity. - in poll() during job runtime and if they fail, the job is terminated. --- batchspawner/batchspawner.py | 112 +++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 318c7db7..9b8bdf2e 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -186,6 +186,12 @@ def _req_keepvars_default(self): "specification.", ).tag(config=True) + connect_to_job_cmd = Unicode('', + help="Command to connect to running batch job and forward the port " + "of the running notebook to the Hub. If empty, direct connectivity is assumed. " + "Uses self.job_id as {job_id} and the self.port as {port}." + ).tag(config=True) + # Raw output of job submission command unless overridden job_id = Unicode() @@ -215,6 +221,18 @@ def cmd_formatted_for_batch(self): """The command which is substituted inside of the batch script""" return " ".join([self.batchspawner_singleuser_cmd] + self.cmd + self.get_args()) + async def connect_to_job(self): + """This command ensures the port of the singleuser server is reachable from the + Batchspawner machine. By default, it does nothing, i.e. direct connectivity + is assumed. + """ + subvars = self.get_req_subvars() + subvars['job_id'] = self.job_id + subvars['port'] = self.port + cmd = ' '.join((format_template(self.exec_prefix, **subvars), + format_template(self.connect_to_job_cmd, **subvars))) + await self.run_background_command(cmd) + async def run_command(self, cmd, input=None, env=None): proc = await asyncio.create_subprocess_shell( cmd, @@ -268,6 +286,46 @@ async def run_command(self, cmd, input=None, env=None): out = out.decode().strip() return out + # List of running background processes, e.g. used by connect_to_job. + background_processes = [] + + async def _async_wait_process(self, sleep_time): + """Asynchronously sleeping process for delayed checks""" + await asyncio.sleep(sleep_time) + + async def run_background_command(self, cmd, startup_check_delay=1, input=None, env=None): + """Runs the given background command, adds it to background_processes, + and checks if the command is still running after startup_check_delay.""" + background_process = self.run_command(cmd, input, env) + success_check_delay = self._async_wait_process(startup_check_delay) + + # Start up both the success check process and the actual process. + done, pending = await asyncio.wait([background_process, success_check_delay], return_when=asyncio.FIRST_COMPLETED) + + # If the success check process is the one which exited first, all is good, else fail. + if list(done)[0]._coro == success_check_delay: + background_task = list(pending)[0] + self.background_processes.append(background_task) + return background_task + else: + self.log.error("Background command exited early: %s" % cmd) + gather_pending = asyncio.gather(*pending) + gather_pending.cancel() + try: + self.log.debug("Cancelling pending success check task...") + await gather_pending + except asyncio.CancelledError: + self.log.debug("Cancel was successful.") + pass + + # Retrieve exception from "done" process. + try: + gather_done = asyncio.gather(*done) + await gather_done + except: + self.log.debug("Retrieving exception from failed background task...") + raise RuntimeError('{} failed!'.format(cmd)) + async def _get_batch_script(self, **subvars): """Format batch script from vars""" # Could be overridden by subclasses, but mainly useful for testing @@ -299,6 +357,27 @@ async def submit_batch_script(self): self.job_id = "" return self.job_id + def background_tasks_ok(self): + # Check background processes. + if self.background_processes: + self.log.debug('Checking background processes...') + for background_process in self.background_processes: + if background_process.done(): + self.log.debug('Found a background process in state "done"...') + try: + background_exception = background_process.exception() + except asyncio.CancelledError: + self.log.error('Background process was cancelled!') + if background_exception: + self.log.error('Background process exited with an exception:') + self.log.error(background_exception) + self.log.error('At least one background process exited!') + return False + else: + self.log.debug('Found a not-yet-done background process...') + self.log.debug('All background processes still running.') + return True + # Override if your batch system needs something more elaborate to query the job status batch_query_cmd = Unicode( "", @@ -353,6 +432,29 @@ async def cancel_batch_job(self): ) ) self.log.info("Cancelling job " + self.job_id + ": " + cmd) + + if self.background_processes: + self.log.debug('Job being cancelled, cancelling background processes...') + for background_process in self.background_processes: + if not background_process.cancelled(): + try: + background_process.cancel() + except: + self.log.error('Encountered an exception cancelling background process...') + self.log.debug('Cancelled background process, waiting for it to finish...') + try: + await asyncio.wait([background_process]) + except asyncio.CancelledError: + self.log.error('Successfully cancelled background process.') + pass + except: + self.log.error('Background process exited with another exception!') + raise + else: + self.log.debug('Background process already cancelled...') + self.background_processes.clear() + self.log.debug('All background processes cancelled.') + await self.run_command(cmd) def load_state(self, state): @@ -400,6 +502,13 @@ async def poll(self): """Poll the process""" status = await self.query_job_status() if status in (JobStatus.PENDING, JobStatus.RUNNING, JobStatus.UNKNOWN): + if not self.background_tasks_ok(): + self.log.debug('Going to stop job, since background tasks have failed!') + await self.stop(now=True) + status = await self.query_job_status() + if status not in (JobStatus.PENDING, JobStatus.RUNNING, JobStatus.UNKNOWN): + self.clear_state() + return 1 return None else: self.clear_state() @@ -466,6 +575,9 @@ async def start(self): ) ) + if self.connect_to_job_cmd: + await self.connect_to_job() + return self.ip, self.port async def stop(self, now=False): From 78ad46ad5706381950cd26a2317a30e167de4c91 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:44:58 +0200 Subject: [PATCH 02/10] batchspawner/CondorSpawner: Make use of connect_to_job feature. This leverages condor_ssh_to_job to forward the port of the single user server to the hub, removing the need for direct connectivity from the hub to the execute nodes. --- batchspawner/batchspawner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 9b8bdf2e..79ea5f74 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -999,6 +999,7 @@ class CondorSpawner(UserEnvMixin, BatchSpawnerRegexStates): 'condor_q {job_id} -format "%s, " JobStatus -format "%s" RemoteHost -format "\n" True' ).tag(config=True) batch_cancel_cmd = Unicode("condor_rm {job_id}").tag(config=True) + connect_to_job_cmd = Unicode("condor_ssh_to_job -ssh \"ssh -L {port}:localhost:{port} -oExitOnForwardFailure=yes\" {job_id}").tag(config=True) # job status: 1 = pending, 2 = running state_pending_re = Unicode(r"^1,").tag(config=True) state_running_re = Unicode(r"^2,").tag(config=True) @@ -1021,6 +1022,9 @@ def cmd_formatted_for_batch(self): .replace("'", "''") ) + def state_gethost(self): + """This always returns localhost since connect_to_job forwards the singleuser server port from the spawned job""" + return "localhost" class LsfSpawner(BatchSpawnerBase): """A Spawner that uses IBM's Platform Load Sharing Facility (LSF) to launch notebooks.""" From d778f081d7e4d3f5e62f81fb82a69cc827dee6b2 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:47:17 +0200 Subject: [PATCH 03/10] BatchSpawnerBase: Extend connect_to_job for port forwarding approaches. This allows to use {rport} inside the connect_to_job_cmd. If this is done, {port} is set to a random_port chosen locally on the Hub, and {rport} is set to the original remote port. This is useful e.g. for SSH port forwarding. If this is used, the {rport} of the notebook is saved into a new class variable in case it will be needed again. --- batchspawner/batchspawner.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 79ea5f74..2e23b8f0 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -32,6 +32,7 @@ from jupyterhub.spawner import Spawner from traitlets import Integer, Unicode, Float, Dict, default +from jupyterhub.utils import random_port from jupyterhub.spawner import set_user_setuid @@ -190,8 +191,16 @@ def _req_keepvars_default(self): help="Command to connect to running batch job and forward the port " "of the running notebook to the Hub. If empty, direct connectivity is assumed. " "Uses self.job_id as {job_id} and the self.port as {port}." + "If {rport} is used in this string, it is set to self.port, " + "and a new random self.port is chosen locally and used as {port}." + "This is useful e.g. for SSH port forwarding." ).tag(config=True) + rport = Integer(0, + help="Remote port of notebook, will be set if it differs from the self.port." + "This is set by connect_to_job() if needed." + ) + # Raw output of job submission command unless overridden job_id = Unicode() @@ -223,12 +232,19 @@ def cmd_formatted_for_batch(self): async def connect_to_job(self): """This command ensures the port of the singleuser server is reachable from the - Batchspawner machine. By default, it does nothing, i.e. direct connectivity - is assumed. + Batchspawner machine. Only called if connect_to_job_cmd is set. + If the template string connect_to_job_cmd contains {rport}, + a new random self.port is chosen locally (useful e.g. for SSH port forwarding). """ subvars = self.get_req_subvars() subvars['job_id'] = self.job_id - subvars['port'] = self.port + if '{rport}' in self.connect_to_job_cmd: + self.rport = self.port + self.port = random_port() + subvars['rport'] = self.rport + subvars['port'] = self.port + else: + subvars['port'] = self.port cmd = ' '.join((format_template(self.exec_prefix, **subvars), format_template(self.connect_to_job_cmd, **subvars))) await self.run_background_command(cmd) From 3e14a604e6d27969f913b9aae6994e884ecdb4ab Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:48:31 +0200 Subject: [PATCH 04/10] batchspawner/CondorSpawner: Make use of connect_to_job rport feature. This uses the functionality to use a random local port on the hub to forward the notebook port to. It ensures no port collisions appear between different, forwarded notebooks. --- batchspawner/batchspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 2e23b8f0..cd6fd325 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -1015,7 +1015,7 @@ class CondorSpawner(UserEnvMixin, BatchSpawnerRegexStates): 'condor_q {job_id} -format "%s, " JobStatus -format "%s" RemoteHost -format "\n" True' ).tag(config=True) batch_cancel_cmd = Unicode("condor_rm {job_id}").tag(config=True) - connect_to_job_cmd = Unicode("condor_ssh_to_job -ssh \"ssh -L {port}:localhost:{port} -oExitOnForwardFailure=yes\" {job_id}").tag(config=True) + connect_to_job_cmd = Unicode("condor_ssh_to_job -ssh \"ssh -L {port}:localhost:{rport} -oExitOnForwardFailure=yes\" {job_id}").tag(config=True) # job status: 1 = pending, 2 = running state_pending_re = Unicode(r"^1,").tag(config=True) state_running_re = Unicode(r"^2,").tag(config=True) From f3735582ab30d6a58297ff75f6b30c3ecf07b3c0 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:49:52 +0200 Subject: [PATCH 05/10] BatchSpawnerBase: Don't use internal API of asyncio.Tasks. Wrapping the coroutines into futures first allows to directly check the state of the futures. --- batchspawner/batchspawner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index cd6fd325..88d103e7 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -312,14 +312,14 @@ async def _async_wait_process(self, sleep_time): async def run_background_command(self, cmd, startup_check_delay=1, input=None, env=None): """Runs the given background command, adds it to background_processes, and checks if the command is still running after startup_check_delay.""" - background_process = self.run_command(cmd, input, env) - success_check_delay = self._async_wait_process(startup_check_delay) + background_process = asyncio.ensure_future(self.run_command(cmd, input, env)) + success_check_delay = asyncio.ensure_future(self._async_wait_process(startup_check_delay)) # Start up both the success check process and the actual process. done, pending = await asyncio.wait([background_process, success_check_delay], return_when=asyncio.FIRST_COMPLETED) # If the success check process is the one which exited first, all is good, else fail. - if list(done)[0]._coro == success_check_delay: + if success_check_delay in done: background_task = list(pending)[0] self.background_processes.append(background_task) return background_task From 02c4833d69b42e67be71c43b92dbe54b508bc582 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:51:03 +0200 Subject: [PATCH 06/10] BatchSpawnerBase: Replace {host} with self.ip in connect_to_job_cmd. For implementations other than condor_ssh_to_job, it might be useful to use the hostname on which the notebook was spawned to proxy it. --- batchspawner/batchspawner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 88d103e7..532cf1af 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -190,7 +190,7 @@ def _req_keepvars_default(self): connect_to_job_cmd = Unicode('', help="Command to connect to running batch job and forward the port " "of the running notebook to the Hub. If empty, direct connectivity is assumed. " - "Uses self.job_id as {job_id} and the self.port as {port}." + "Uses self.job_id as {job_id}, self.port as {port} and self.ip as {host}." "If {rport} is used in this string, it is set to self.port, " "and a new random self.port is chosen locally and used as {port}." "This is useful e.g. for SSH port forwarding." @@ -237,6 +237,7 @@ async def connect_to_job(self): a new random self.port is chosen locally (useful e.g. for SSH port forwarding). """ subvars = self.get_req_subvars() + subvars['host'] = self.ip subvars['job_id'] = self.job_id if '{rport}' in self.connect_to_job_cmd: self.rport = self.port From 4ffa087f9a0a1a03a04176b1a076cb1aa7fe3069 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:52:21 +0200 Subject: [PATCH 07/10] batchspawner/CondorSpawner: Allow to disable connect_to_job_cmd. If connect_to_job_cmd is explcitly set to an empty string, CondorSpawner will not override the hostname with localhost, allowing to revert to the old behaviour (assuming direct connectivity). --- batchspawner/batchspawner.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 532cf1af..56f44a4a 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -1040,8 +1040,11 @@ def cmd_formatted_for_batch(self): ) def state_gethost(self): - """This always returns localhost since connect_to_job forwards the singleuser server port from the spawned job""" - return "localhost" + """Returns localhost if connect_to_job is used, as this forwards the singleuser server port from the spawned job""" + if self.connect_to_job_cmd: + return "localhost" + else: + return super(CondorSpawner,self).state_gethost() class LsfSpawner(BatchSpawnerBase): """A Spawner that uses IBM's Platform Load Sharing Facility (LSF) to launch notebooks.""" From b56fc824e7ea0c2fc30cc956414a32035f697170 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:57:01 +0200 Subject: [PATCH 08/10] BatchSpawnerBase: Move connect_to_job before DB commit and debug logging. With this, the first database commit will already contain the forwarded port if connect_to_job is used, and the log will show the correct port number. --- batchspawner/batchspawner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 56f44a4a..fe05ad36 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -585,6 +585,9 @@ async def start(self): if hasattr(self, "mock_port"): self.port = self.mock_port + if self.connect_to_job_cmd: + await self.connect_to_job() + self.db.commit() self.log.info( "Notebook server job {0} started at {1}:{2}".format( @@ -592,9 +595,6 @@ async def start(self): ) ) - if self.connect_to_job_cmd: - await self.connect_to_job() - return self.ip, self.port async def stop(self, now=False): From ff1240fffe13e1366031e97500c58c8175999a6d Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 21:59:29 +0200 Subject: [PATCH 09/10] BatchSpawnerBase: Store ip, port in self.server after connect_to_job This is required since data may change after the connect_to_job function. --- batchspawner/batchspawner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index fe05ad36..93b1bf27 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -588,6 +588,11 @@ async def start(self): if self.connect_to_job_cmd: await self.connect_to_job() + # Port and ip can be changed in connect_to_job, push out to jupyterhub. + if self.server: + self.server.port = self.port + self.server.ip = self.ip + self.db.commit() self.log.info( "Notebook server job {0} started at {1}:{2}".format( From 3c3e3aa281cf1873c7dd9e30269286b5da18b001 Mon Sep 17 00:00:00 2001 From: Oliver Freyermuth Date: Mon, 11 Jul 2022 23:02:14 +0200 Subject: [PATCH 10/10] batchspawner: tests: Disable connect_to_job in testing. The background command can not cleanly be simulated to stay running. --- batchspawner/tests/test_spawners.py | 1 + 1 file changed, 1 insertion(+) diff --git a/batchspawner/tests/test_spawners.py b/batchspawner/tests/test_spawners.py index 2416680f..8cb970fd 100644 --- a/batchspawner/tests/test_spawners.py +++ b/batchspawner/tests/test_spawners.py @@ -560,6 +560,7 @@ def test_condor(db, io_loop): "req_nprocs": "5", "req_memory": "5678", "req_options": "some_option_asdf", + "connect_to_job_cmd": "", } batch_script_re_list = [ re.compile(r"exec batchspawner-singleuser singleuser_command"),