-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjob.py
118 lines (104 loc) · 3.84 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
pydagman.job
Provides the Job class to represent a Condor job to be submitted via DAGman
Classes:
Job: Represent a DAGman Condor job
DuplicateParentError: Exception thrown when trying to add a parent already in
the parents list
"""
from shortuuid import uuid
class Job:
"""
Class to represent a DAGman Condor Job
Attributes:
submit_file (string): Path to the Condor submit file
name (string): The name to assign to the Condor job
vars (dict): A dictionary of key/value pairs representing DAGman VARS for a job
parents (list[string]): A list of job names that are parents of this job
pre (list[string]): Command and arguments to run as SCRIPT PRE for this job
post (list[string]): Command and arguments to run as SCRIPT POST for this job
retry (int): Number of times to retry the job
"""
def __init__(self, submit_file, name=''):
"""
Create a new pydagman.Job object
Arguments:
submit_file (string): The full path to the Condor submit file for this job
name (string): The name to assign to the job. Optional parameter, if name is
not provided a uuid is automatically generated
Yields:
Job object
"""
self.name = name if name else uuid()
self.submit_file = submit_file
self.vars = {}
self.parents = []
self.pre = []
self.post = []
self.num_retries = 0
self.categories = []
self.pre_script_exit_code = ""
self.noop = False
def add_var(self, name, value):
"""Add a VARS directive for this job
Args:
name (string): name of the variable
value (string): value to assign
"""
self.vars[name] = value
def add_parent(self, parent_job):
"""Add a parent to this job
Args:
parent_job (string): Job name of the parent job to add
Raises:
DuplicateParentError: if parent_job is already in the parents list
"""
if parent_job.name not in self.parents:
self.parents.append(parent_job.name)
else:
raise DuplicateParentError("DuplicateParentError in \
dagman.Job.add_parent: Parent job %s is already a parent of %s."
% (parent_job.name, self.name))
def add_pre(self, path, *args):
"""Add a SCRIPT PRE directive to the job
Args:
path (string): Path to the script
*args (optional): Additional arguments to the script
"""
self.pre.append(path)
for index, arg in enumerate(args):
self.pre.append(arg)
def add_post(self, path, *args):
"""Add a SCRIPT POST directive to the job
Args:
path (string): Path to the script
*args (optional): Additional arguments to the script
"""
self.post.append(path)
for index, arg in enumerate(args):
self.post.append(arg)
def add_category(self, category):
"""Add a category to this job. Once done, dagman.dagfile.Dagfile.set_maxjobs
can be used to limit the number of jobs for a category
Args:
category (string): Name of the category
"""
self.categories.append(category)
def retry(self, num_retries):
"""Set the number of retries for this job
Args:
num_retries (int): Number of times to retry the job
"""
self.num_retries = num_retries
def pre_skip(self, exit_code):
"""Skip the entire node if the PRE script returns this value
Args:
exit_code (string): Exit code to cause the node to skip
"""
self.pre_skip_exit_code = exit_code
class DuplicateParentError(Exception):
"""Thrown if attempting to add a parent that is already in the parents list"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg