From f419720dfba572954eb17fd6fc2af2ca9bf94040 Mon Sep 17 00:00:00 2001 From: Gordon Pendleton Date: Sat, 9 Nov 2019 22:31:35 -0500 Subject: [PATCH 1/3] fix issue #57 --- README.md | 5 +++++ jobtastic/task.py | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d57320d..4617fce 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,11 @@ These let you tweak the default behavior. Most often, you'll just be setting the `cache_duration` to enable result caching. +#### always_start_new_herd + +Skip herd avoidance check if True, but still trigger avoidance for other tasks. +Defaults to False. + #### cache_duration If you want your results cached, diff --git a/jobtastic/task.py b/jobtastic/task.py index edcb814..644edc3 100644 --- a/jobtastic/task.py +++ b/jobtastic/task.py @@ -82,6 +82,7 @@ class JobtasticTask(Task): front-end code so that users know what to expect. """ abstract = True + always_start_new_herd = False #: The shared cache used for locking and thundering herd protection _cache = None @@ -218,11 +219,12 @@ def apply_async(self, args, kwargs, **options): 'Found existing cached and completed task: %s', task_id) return self.AsyncResult(task_id) - # Check for an in-progress equivalent task to avoid duplicating work - task_id = self.cache.get('herd:%s' % cache_key) - if task_id: - logging.info('Found existing in-progress task: %s', task_id) - return self.AsyncResult(task_id) + if not self.always_start_new_herd: + # Check for an in-progress equivalent task to avoid duplicating work + task_id = self.cache.get('herd:%s' % cache_key) + if task_id: + logging.info('Found existing in-progress task: %s', task_id) + return self.AsyncResult(task_id) # It's not cached and it's not already running. Use an atomic lock to # start the task, ensuring there isn't a race condition that could From 6c2134f4be5aa891d3fe3ab8142665d99eb39149 Mon Sep 17 00:00:00 2001 From: Gordon Pendleton Date: Sat, 9 Nov 2019 23:03:59 -0500 Subject: [PATCH 2/3] revoke in-progress task when starting new herd --- jobtastic/task.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/jobtastic/task.py b/jobtastic/task.py index 644edc3..1684d85 100644 --- a/jobtastic/task.py +++ b/jobtastic/task.py @@ -219,12 +219,18 @@ def apply_async(self, args, kwargs, **options): 'Found existing cached and completed task: %s', task_id) return self.AsyncResult(task_id) - if not self.always_start_new_herd: - # Check for an in-progress equivalent task to avoid duplicating work - task_id = self.cache.get('herd:%s' % cache_key) - if task_id: - logging.info('Found existing in-progress task: %s', task_id) - return self.AsyncResult(task_id) + # Check for an in-progress equivalent task to avoid duplicating work + task_id = self.cache.get('herd:%s' % cache_key) + if task_id: + logging.info('Found existing in-progress task: %s', task_id) + result = self.AsyncResult(task_id) + if not self.always_start_new_herd: + return result + if not result.ready(): + logging.info('Revoking in-progress task and starting new herd: %s', task_id) + result.revoke() + else: + logging.info('Ignoring in-progress task and starting new herd: %s', task_id) # It's not cached and it's not already running. Use an atomic lock to # start the task, ensuring there isn't a race condition that could From 3d70af092b36416dd271d3baee7d83b10d3b250d Mon Sep 17 00:00:00 2001 From: Gordon Pendleton Date: Sat, 9 Nov 2019 23:13:02 -0500 Subject: [PATCH 3/3] flake8 - fix line length --- jobtastic/task.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/jobtastic/task.py b/jobtastic/task.py index 1684d85..6b91df3 100644 --- a/jobtastic/task.py +++ b/jobtastic/task.py @@ -227,10 +227,16 @@ def apply_async(self, args, kwargs, **options): if not self.always_start_new_herd: return result if not result.ready(): - logging.info('Revoking in-progress task and starting new herd: %s', task_id) + logging.info( + 'Revoking in-progress task and starting new herd: %s', + task_id, + ) result.revoke() else: - logging.info('Ignoring in-progress task and starting new herd: %s', task_id) + logging.info( + 'Ignoring in-progress task and starting new herd: %s', + task_id, + ) # It's not cached and it's not already running. Use an atomic lock to # start the task, ensuring there isn't a race condition that could