-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
343 lines (284 loc) · 7.79 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package queue
import (
"errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
type Queue struct {
db DbInterface
}
const (
StatePending = "pending"
StateRunning = "running"
StateCompleted = "completed"
StateError = "error"
)
const (
DefaultTimeout = time.Minute * 5
DefaultMaxTries = 3
)
type Meta struct {
Created time.Time `bson:"created"`
Dispatched *time.Time `bson:"dispatched"`
Completed *time.Time `bson:"completed"`
}
type Task struct {
Id primitive.ObjectID `bson:"_id,omitempty"`
Topic string `bson:"topic"`
Payload any `bson:"payload"`
Tries uint `bson:"tries"`
MaxTries uint `bson:"maxtries"`
State string `bson:"state"`
Message string `bson:"message"`
Meta Meta
}
type event struct {
Task Task `bson:"fullDocument"`
}
// NewQueue initializes a new Queue instance with the provided DbInterface.
func NewQueue(db DbInterface) *Queue {
queue := Queue{
db: db,
}
return &queue
}
var nowFunc = time.Now
func setNowFunc(n func() time.Time) {
nowFunc = n
}
type PublishOptions struct {
MaxTries uint
Tries int
}
// NewPublishOptions creates a new PublishOptions with default settings.
func NewPublishOptions() *PublishOptions {
return &PublishOptions{
MaxTries: 0,
Tries: -1,
}
}
// SetMaxTries sets the maximum number of retry attempts for publishing. Returns the updated PublishOptions instance.
func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions {
p.MaxTries = maxTries
return p
}
func (p *PublishOptions) setTries(tries uint) *PublishOptions {
p.Tries = int(tries)
return p
}
// Publish inserts a new task into the queue with the given topic, payload, and maxTries.
// If maxTries is zero, it defaults to DefaultMaxTries.
func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Task, error) {
o := PublishOptions{
MaxTries: DefaultMaxTries,
Tries: 0,
}
for _, opt := range opts {
if opt == nil {
continue
}
if opt.MaxTries > 0 {
o.MaxTries = opt.MaxTries
}
if opt.Tries >= 0 {
o.Tries = opt.Tries
}
}
t := Task{
Topic: topic,
Payload: payload,
Tries: uint(o.Tries),
MaxTries: o.MaxTries,
Meta: Meta{
Created: nowFunc(),
Dispatched: nil,
Completed: nil,
},
State: StatePending,
}
insertedId, err := q.db.InsertOne(t)
if err != nil {
return nil, err
}
t.Id = insertedId
return &t, nil
}
// GetNext retrieves the next item from the queue for the given topic, marks it as running, and increments its tries count.
func (q *Queue) GetNext(topic string) (*Task, error) {
t := Task{}
res := q.db.FindOneAndUpdate(bson.M{
"topic": topic,
"state": StatePending,
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
},
bson.M{
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
"$inc": bson.M{"tries": 1},
},
options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}).SetReturnDocument(options.After),
)
if errors.Is(res.Err(), mongo.ErrNoDocuments) {
return nil, nil
}
if err := res.Decode(&t); err != nil {
return nil, err
}
return &t, nil
}
// GetNextById retrieves the next pending task by its ID, transitions it to the running state, and increments its tries count.
func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) {
t := Task{}
res := q.db.FindOneAndUpdate(bson.M{
"_id": id,
"state": StatePending,
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
},
bson.M{
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
"$inc": bson.M{"tries": 1},
},
options.FindOneAndUpdate().SetReturnDocument(options.After),
)
if errors.Is(res.Err(), mongo.ErrNoDocuments) {
return nil, nil
}
if err := res.Decode(&t); err != nil {
return nil, err
}
return &t, nil
}
// Reschedule republishes a task to the queue, retaining its topic, payload, tries, and maxTries settings.
func (q *Queue) Reschedule(task *Task) (*Task, error) {
return q.Publish(task.Topic, task.Payload, NewPublishOptions().setTries(task.Tries).SetMaxTries(task.MaxTries))
}
type Callback func(t Task)
// Subscribe listens for new tasks on a given topic and calls the provided callback when a new task is available.
// It processes unprocessed tasks scheduled before starting the watch and continuously monitors for new tasks.
func (q *Queue) Subscribe(topic string, cb Callback) error {
pipeline := bson.D{{"$match", bson.D{
{"operationType", "insert"},
{"fullDocument.topic", topic},
{"fullDocument.state", StatePending}}},
}
stream, err := q.db.Watch(mongo.Pipeline{pipeline})
if err != nil {
return err
}
//goland:noinspection ALL
defer stream.Close(q.db.Context())
processedUntil := nowFunc()
// process unprocessed tasks scheduled before we started watching
for {
task, err := q.GetNext(topic)
if err != nil {
return err
}
if task == nil {
break
}
processedUntil = task.Meta.Created
cb(*task)
}
for stream.Next(q.db.Context()) {
var evt event
if err := stream.Decode(&evt); err != nil {
continue
}
// already processed
if evt.Task.Meta.Created.Before(processedUntil) {
continue
}
task, err := q.GetNextById(evt.Task.Id)
if err != nil {
_ = q.Err(evt.Task.Id.Hex(), err)
continue
}
if task != nil {
cb(*task)
}
}
return nil
}
// Ack acknowledges a task completion by its ID, updating its state to "completed" and setting the completion timestamp.
func (q *Queue) Ack(id string) error {
oId, err := primitive.ObjectIDFromHex(id)
if err != nil {
return err
}
return q.db.UpdateOne(
bson.M{"_id": oId},
bson.M{"$set": bson.M{
"state": StateCompleted,
"meta.completed": nowFunc(),
}})
}
// Err updates the state of a task to "error" by its ID, setting the completion time and storing the error message.
func (q *Queue) Err(id string, err error) error {
oId, e := primitive.ObjectIDFromHex(id)
if e != nil {
return e
}
return q.db.UpdateOne(
bson.M{"_id": oId},
bson.M{"$set": bson.M{
"state": StateError,
"meta.completed": nowFunc(),
"message": err.Error()},
})
}
// Selfcare re-schedules long-running tasks and sets tasks exceeding max tries to error state.
// It updates tasks in an ongoing state that haven't been acknowledged within a specific timeout period.
// If timeout is zero, the default timeout value is used. Optionally, tasks can be filtered by topic.
func (q *Queue) Selfcare(topic string, timeout time.Duration) error {
// re-schedule long-running tasks
// this only happens if the processor could not ack the task, i.e. the application crashed
if timeout == 0 {
timeout = DefaultTimeout
}
query := bson.M{
"state": StateRunning,
"meta.dispatched": bson.M{"$lt": nowFunc().Add(timeout)},
}
if len(topic) > 0 {
query["topic"] = topic
}
err1 := q.db.UpdateMany(
query,
bson.M{"$set": bson.M{
"state": StatePending,
"meta.dispatched": nil},
})
// set tasks exceeding maxtries to error
query = bson.M{
"state": StatePending,
"$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}},
}
if len(topic) > 0 {
query["topic"] = topic
}
err2 := q.db.UpdateMany(
query,
bson.M{"$set": bson.M{
"state": StateError,
"meta.completed": nowFunc()},
})
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return nil
}
// CreateIndexes creates MongoDB indexes for the task collection to improve query performance and manage TTL for completed tasks.
func (q *Queue) CreateIndexes() error {
err := q.db.CreateIndexes([]mongo.IndexModel{{
Keys: bson.D{{"topic", 1}, {"state", 1}},
}, {
Keys: bson.D{{"meta.completed", 1}}, Options: options.Index().SetExpireAfterSeconds(3600),
}})
return err
}