From 94c5c17ff1a14704d69b208289809e6851bc518a Mon Sep 17 00:00:00 2001 From: sulantha Date: Thu, 7 Jan 2021 13:52:07 -0800 Subject: [PATCH 1/3] Issue #3288. Implementing input delayed retry for all inputs of the node. Simplified implementation for all inputs (not just File type). Need to be improved for File type later on. --- nipype/pipeline/engine/nodes.py | 24 +++++++++++++++++++----- nipype/utils/config.py | 3 +++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index aeff5f12da..2fbac875ab 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -14,6 +14,7 @@ from copy import deepcopy from glob import glob from logging import INFO +import time from tempfile import mkdtemp @@ -605,11 +606,24 @@ def _get_inputs(self): output_value = outputs.dictcopy()[output_name] logger.debug("output: %s", output_name) - try: - self.set_input(key, deepcopy(output_value)) - except traits.TraitError as e: + traits_err = None + # input_tries = 1 # Default no reties + # input_retry_delay = 5 + # input_retry_exponential_backoff_factor = 1 + input_tries = self.config["execution"]["input_tries"] + input_retry_delay = self.config["execution"]["input_retry_delay"] + input_retry_exp_backoff_factor = self.config["execution"]["input_retry_exp_backoff_factor"] + for try_n in range(input_tries): + try: + self.set_input(key, deepcopy(output_value)) + break + except traits.TraitError as e: + traits_err = e + if input_tries != 1: + time.sleep(input_retry_delay*try_n*input_retry_exp_backoff_factor) + if traits_err is not None: msg = ( - e.args[0], + traits_err.args[0], "", "Error setting node input:", "Node: %s" % self.name, @@ -617,7 +631,7 @@ def _get_inputs(self): "results_file: %s" % results_fname, "value: %s" % str(output_value), ) - e.args = ("\n".join(msg),) + traits_err.args = ("\n".join(msg),) raise # Successfully set inputs diff --git a/nipype/utils/config.py b/nipype/utils/config.py index e7020eb30d..1508541bcc 100644 --- a/nipype/utils/config.py +++ b/nipype/utils/config.py @@ -62,6 +62,9 @@ poll_sleep_duration = 2 xvfb_max_wait = 10 check_version = true +input_tries = 1 +input_retry_delay = 5 +input_retry_exp_backoff_factor = 1 [monitoring] enabled = false From d6dd60c6fdb708000842f94a4980f29b69fa776e Mon Sep 17 00:00:00 2001 From: sulantha Date: Thu, 7 Jan 2021 14:45:01 -0800 Subject: [PATCH 2/3] Issue #3288. Implementing input delayed retry for all inputs of the node. Simplified implementation for all inputs (not just File type). Need to be improved for File type later on. --- nipype/pipeline/engine/nodes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 2fbac875ab..32322b5f69 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -610,9 +610,9 @@ def _get_inputs(self): # input_tries = 1 # Default no reties # input_retry_delay = 5 # input_retry_exponential_backoff_factor = 1 - input_tries = self.config["execution"]["input_tries"] - input_retry_delay = self.config["execution"]["input_retry_delay"] - input_retry_exp_backoff_factor = self.config["execution"]["input_retry_exp_backoff_factor"] + input_tries = int(self.config["execution"]["input_tries"]) + input_retry_delay = int(self.config["execution"]["input_retry_delay"]) + input_retry_exp_backoff_factor = int(self.config["execution"]["input_retry_exp_backoff_factor"]) for try_n in range(input_tries): try: self.set_input(key, deepcopy(output_value)) From 87ab436765eddb8a6e9fc27c55f3c963128f9cfd Mon Sep 17 00:00:00 2001 From: sulantha Date: Thu, 7 Jan 2021 18:09:33 -0800 Subject: [PATCH 3/3] Issue #3288. Added logger entry when retry is triggered --- nipype/pipeline/engine/nodes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 32322b5f69..406db3f5ab 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -620,7 +620,11 @@ def _get_inputs(self): except traits.TraitError as e: traits_err = e if input_tries != 1: - time.sleep(input_retry_delay*try_n*input_retry_exp_backoff_factor) + sleep_time = input_retry_delay*try_n*input_retry_exp_backoff_factor + logger.warning("Input set failed for : {0} -> {1}. Retrying in {2} secs. ".format(key, + results_fname, + sleep_time)) + time.sleep(sleep_time) if traits_err is not None: msg = ( traits_err.args[0],