20
20
21
21
22
22
DEFAULT_INSERT = {
23
- "priority" : 0 ,
24
23
"attempts" : 0 ,
25
24
"locked_by" : None ,
26
25
"locked_at" : None ,
@@ -70,10 +69,18 @@ def repair(self):
70
69
"$inc" : {"attempts" : 1 }}
71
70
)
72
71
73
- def put (self , payload ):
72
+ def drop_max_attempts (self ):
73
+ """
74
+ """
75
+ self .collection .find_and_modify (
76
+ {"attempts" : {"$gte" : self .max_attempts }},
77
+ remove = True )
78
+
79
+ def put (self , payload , priority = 0 ):
74
80
"""Place a job into the queue
75
81
"""
76
82
job = dict (DEFAULT_INSERT )
83
+ job ['priority' ] = priority
77
84
job ['payload' ] = payload
78
85
return self .collection .insert (job )
79
86
@@ -82,8 +89,7 @@ def next(self):
82
89
query = {"locked_by" : None ,
83
90
"locked_at" : None ,
84
91
"attempts" : {"$lt" : self .max_attempts }},
85
- update = {"$set" : {"attempts" : 1 ,
86
- "locked_by" : self .consumer_id ,
92
+ update = {"$set" : {"locked_by" : self .consumer_id ,
87
93
"locked_at" : datetime .now ()}},
88
94
sort = [('priority' , pymongo .DESCENDING )],
89
95
new = 1 ,
@@ -128,10 +134,6 @@ def __init__(self, queue, data):
128
134
self ._queue = queue
129
135
self ._data = data
130
136
131
- @property
132
- def data (self ):
133
- return self ._data
134
-
135
137
@property
136
138
def payload (self ):
137
139
return self ._data ['payload' ]
@@ -140,6 +142,26 @@ def payload(self):
140
142
def job_id (self ):
141
143
return self ._data ["_id" ]
142
144
145
+ @property
146
+ def priority (self ):
147
+ return self ._data ["priority" ]
148
+
149
+ @property
150
+ def attempts (self ):
151
+ return self ._data ["attempts" ]
152
+
153
+ @property
154
+ def locked_by (self ):
155
+ return self ._data ["locked_by" ]
156
+
157
+ @property
158
+ def locked_at (self ):
159
+ return self ._data ["locked_at" ]
160
+
161
+ @property
162
+ def last_error (self ):
163
+ return self ._data ["last_error" ]
164
+
143
165
## Job Control
144
166
145
167
def complete (self ):
@@ -176,7 +198,7 @@ def release(self):
176
198
## Context Manager support
177
199
178
200
def __enter__ (self ):
179
- return self .data
201
+ return self ._data
180
202
181
203
def __exit__ (self , type , value , tb ):
182
204
if (type , value , tb ) == (None , None , None ):
0 commit comments