Skip to content

Commit 8e58878

Browse files
committed
reraise exception back to user
# Conflicts: # submitit/core/submission.py
1 parent 4cf1462 commit 8e58878

File tree

7 files changed

+530
-45
lines changed

7 files changed

+530
-45
lines changed

submitit/core/core.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from . import logger, utils
2020

21+
log = logger.get_logger()
22+
2123
# R as in "Result", so yes it's covariant.
2224
# pylint: disable=typevar-name-incorrect-variance
2325
R = tp.TypeVar("R", covariant=True)
@@ -128,12 +130,10 @@ def update(self) -> None:
128130
return
129131
self._num_calls += 1
130132
try:
131-
logger.get_logger().debug(f"Call #{self.num_calls} - Command {' '.join(command)}")
133+
log.debug(f"Call #{self.num_calls} - Command {' '.join(command)}")
132134
self._output = subprocess.check_output(command, shell=False)
133135
except Exception as e:
134-
logger.get_logger().warning(
135-
f"Call #{self.num_calls} - Bypassing sacct error {e}, status may be inaccurate."
136-
)
136+
log.warning(f"Call #{self.num_calls} - Bypassing sacct error {e}, status may be inaccurate.")
137137
else:
138138
self._info_dict.update(self.read_info(self._output))
139139
self._last_status_check = _time.time()
@@ -323,18 +323,27 @@ def exception(self) -> tp.Optional[tp.Union[utils.UncompletedJobError, utils.Fai
323323
return exceptions[0]
324324

325325
try:
326-
outcome, trace = self._get_outcome_and_result()
326+
outcome, original_err = self._get_outcome_and_result()
327+
if outcome == "success":
328+
return None
327329
except utils.UncompletedJobError as e:
328330
return e
329-
if outcome == "error":
331+
if isinstance(original_err, str):
332+
# Normally original_err is an exception, unless we failed to pickle it.
333+
trace = original_err
330334
return utils.FailedJobError(
331335
f"Job (task={self.task_id}) failed during processing with trace:\n"
332336
f"----------------------\n{trace}\n"
333337
"----------------------\n"
334338
f"You can check full logs with 'job.stderr({self.task_id})' and 'job.stdout({self.task_id})'"
335339
f"or at paths:\n - {self.paths.stderr}\n - {self.paths.stdout}"
336340
)
337-
return None
341+
log.error(
342+
f"Job (task={self.task_id}) failed \n."
343+
f"You can check full logs with 'job.stderr({self.task_id})' and 'job.stdout({self.task_id})'"
344+
f"or at paths:\n - {self.paths.stderr}\n - {self.paths.stdout}"
345+
)
346+
return original_err
338347

339348
def _get_outcome_and_result(self) -> tp.Tuple[str, tp.Any]:
340349
"""Getter for the output of the submitted function.
@@ -371,17 +380,17 @@ def _get_outcome_and_result(self) -> tp.Tuple[str, tp.Any]:
371380
f"Job {self.job_id} (task: {self.task_id}) with path {self.paths.result_pickle}",
372381
f"has not produced any output (state: {self.state})",
373382
]
374-
log = self.stderr()
375-
if log:
376-
message.extend(["Error stream produced:", "-" * 40, log])
383+
stderr = self.stderr()
384+
if stderr:
385+
message.extend(["Error stream produced:", "-" * 40, stderr])
377386
elif self.paths.stdout.exists():
378-
log = subprocess.check_output(["tail", "-40", str(self.paths.stdout)], encoding="utf-8")
387+
stderr = subprocess.check_output(["tail", "-40", str(self.paths.stdout)], encoding="utf-8")
379388
message.extend(
380-
[f"No error stream produced. Look at stdout: {self.paths.stdout}", "-" * 40, log]
389+
[f"No error stream produced. Look at stdout: {self.paths.stdout}", "-" * 40, stderr]
381390
)
382391
else:
383392
message.append(f"No output/error stream produced ! Check: {self.paths.stdout}")
384-
raise utils.UncompletedJobError("\n".join(message))
393+
raise utils.JobResultsNotFoundError("\n".join(message))
385394
try:
386395
output: tp.Tuple[str, tp.Any] = utils.pickle_load(self.paths.result_pickle)
387396
except EOFError:
@@ -504,7 +513,7 @@ def __repr__(self) -> str:
504513
try:
505514
state = self.state
506515
except Exception as e:
507-
logger.get_logger().warning(f"Bypassing state error:\n{e}")
516+
log.warning(f"Bypassing state error:\n{e}")
508517
return f'{self.__class__.__name__}<job_id={self.job_id}, task_id={self.task_id}, state="{state}">'
509518

510519
def __del__(self) -> None:
@@ -702,9 +711,7 @@ def batch(self, allow_implicit_submissions: bool = False) -> tp.Iterator[None]:
702711
try:
703712
yield None
704713
except Exception as e:
705-
logger.get_logger().error(
706-
'Caught error within "with executor.batch()" context, submissions are dropped.\n '
707-
)
714+
log.error('Caught error within "with executor.batch()" context, submissions are dropped.\n ')
708715
raise e
709716
else:
710717
self._submit_delayed_batch()

submitit/core/submission.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
except ImportError:
1818
pass
1919

20-
from . import job_environment, utils
20+
from . import job_environment, tblib, utils
2121
from .logger import get_logger
2222

23+
logger = get_logger()
24+
2325

2426
def process_job(folder: Union[Path, str]) -> None:
2527
"""Loads a pickled job, runs it and pickles the output
@@ -36,7 +38,6 @@ def process_job(folder: Union[Path, str]) -> None:
3638
os.environ["SUBMITIT_FOLDER"] = str(folder)
3739
env = job_environment.JobEnvironment()
3840
paths = env.paths
39-
logger = get_logger()
4041
logger.info(f"Starting with {env}")
4142
logger.info(f"Loading pickle: {paths.submitted_pickle}")
4243
wait_time = 60
@@ -53,20 +54,51 @@ def process_job(folder: Union[Path, str]) -> None:
5354
env = job_environment.JobEnvironment()
5455
env._handle_signals(paths, delayed)
5556
result = delayed.result()
57+
logger.info("Job computed its result")
58+
# if it blocks here, you have a race condition that must be solved!
59+
del delayed
60+
except Exception as error:
61+
logger.error("Submitted job triggered an exception")
62+
with utils.temporary_save_path(paths.result_pickle) as tmp_path:
63+
save_error(error, tmp_path)
64+
raise
65+
except BaseException:
66+
logger.exception("Submitted job encoutered a system error. Will result in an UncompletedJobError")
67+
raise
68+
69+
with utils.temporary_save_path(paths.result_pickle) as tmp_path:
70+
save_result(result, tmp_path)
71+
# if it blocks here, you have a race condition that must be solved!
72+
del result
73+
logger.info("Exitting after successful completion")
74+
75+
76+
def save_result(result, tmp_path: Path):
77+
try:
78+
utils.cloudpickle_dump(("success", result), tmp_path)
5679
logger.info("Job completed successfully")
57-
del delayed # if it blocks here, you have a race condition that must be solved!
58-
with utils.temporary_save_path(paths.result_pickle) as tmppath: # save somewhere else, and move
59-
utils.cloudpickle_dump(("success", result), tmppath)
60-
del result
61-
logger.info("Exitting after successful completion")
62-
except Exception as error: # TODO: check pickle methods for capturing traceback; pickling and raising
80+
except Exception as pickle_error:
81+
logger.error(f"Could not pickle job result because of {pickle_error}")
82+
save_error(pickle_error, tmp_path)
83+
84+
85+
def save_error(error: Exception, tmp_path: Path) -> None:
86+
"""Pickle the full exception with its trace using tblib."""
87+
try:
88+
# tblib needs to be installed after we have created the exception class
89+
# they recommend doing it just before pickling the exception.
90+
# This seems to be a limitation of copyreg.
91+
tblib.install(error)
92+
utils.cloudpickle_dump(("error", error), tmp_path)
93+
except Exception as pickle_error:
94+
logger.error(f"Could not pickle exception:\n{error}\n\nbecause of {pickle_error}")
95+
# Fallbacks to only pickling the trace
6396
try:
64-
with utils.temporary_save_path(paths.result_pickle) as tmppath:
65-
utils.cloudpickle_dump(("error", traceback.format_exc()), tmppath)
97+
utils.cloudpickle_dump(("error", traceback.format_exc()), tmp_path)
6698
except Exception as dumperror:
67-
logger.error(f"Could not dump error:\n{error}\n\nbecause of {dumperror}")
68-
logger.error("Submitted job triggered an exception")
69-
raise error
99+
logger.error(f"Could not dump exception:\n{error}\n\nbecause of {dumperror}")
100+
logger.error("This will trigger a JobResultsNotFoundError")
101+
raise
70102

71103

72104
def submitit_main() -> None:

0 commit comments

Comments
 (0)