Skip to content

Commit 10cbd66

Browse files
committed
test Selfcare
1 parent 3fcddd6 commit 10cbd66

File tree

3 files changed

+99
-10
lines changed

3 files changed

+99
-10
lines changed

cmd/demo/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func main() {
107107
}
108108

109109
if *selfcare {
110-
err := qu.Selfcare(nil)
110+
err := qu.Selfcare("")
111111
if err != nil {
112112
log.Fatal(err)
113113
}

queue.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -208,18 +208,18 @@ func (q *Queue) Err(id string, err error) error {
208208
})
209209
}
210210

211-
func (q *Queue) Selfcare(topic *string) error {
211+
func (q *Queue) Selfcare(topic string) error {
212212
// re-schedule long-running tasks
213213
// this only happens if the processor could not ack the task, i.e. the application crashed
214214
query := bson.M{
215215
"state": StateRunning,
216-
"meta.dispatched": bson.M{"$lt": time.Now().Add(DefaultTimeout)},
216+
"meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)},
217217
}
218-
if topic != nil {
219-
query["topic"] = *topic
218+
if len(topic) > 0 {
219+
query["topic"] = topic
220220
}
221221

222-
_ = q.db.UpdateMany(
222+
err1 := q.db.UpdateMany(
223223
query,
224224
bson.M{"$set": bson.M{
225225
"state": StatePending,
@@ -231,17 +231,25 @@ func (q *Queue) Selfcare(topic *string) error {
231231
"state": StatePending,
232232
"$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}},
233233
}
234-
if topic != nil {
235-
query["topic"] = *topic
234+
if len(topic) > 0 {
235+
query["topic"] = topic
236236
}
237237

238-
_ = q.db.UpdateMany(
238+
err2 := q.db.UpdateMany(
239239
query,
240240
bson.M{"$set": bson.M{
241241
"state": StateError,
242-
"meta.completed": time.Now()},
242+
"meta.completed": nowFunc()},
243243
})
244244

245+
if err1 != nil {
246+
return err1
247+
}
248+
249+
if err2 != nil {
250+
return err2
251+
}
252+
245253
return nil
246254
}
247255

queue_test.go

+81
Original file line numberDiff line numberDiff line change
@@ -415,3 +415,84 @@ func TestQueue_Err(t *testing.T) {
415415
})
416416
}
417417
}
418+
419+
func TestQueue_Selftest(t *testing.T) {
420+
setNowFunc(func() time.Time {
421+
t, _ := time.Parse(time.DateTime, "2024-11-04 15:04:05")
422+
return t
423+
})
424+
425+
tests := []struct {
426+
name string
427+
topic string
428+
error1 error
429+
error2 error
430+
}{
431+
{
432+
name: "Success",
433+
topic: "",
434+
},
435+
{
436+
name: "Success with topic",
437+
topic: "user.delete",
438+
},
439+
{
440+
name: "Reschedule failed",
441+
topic: "",
442+
error1: errors.New("FindOneAndUpdate1"),
443+
},
444+
{
445+
name: "Set maxtries to error failed",
446+
topic: "",
447+
error2: errors.New("FindOneAndUpdate2"),
448+
},
449+
}
450+
451+
for _, tt := range tests {
452+
t.Run(tt.name, func(t *testing.T) {
453+
dbMock := NewDbInterfaceMock(t)
454+
455+
q := NewQueue(dbMock)
456+
457+
query1 := bson.M{
458+
"state": StateRunning,
459+
"meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)},
460+
}
461+
462+
if tt.topic != "" {
463+
query1["topic"] = tt.topic
464+
}
465+
466+
dbMock.EXPECT().UpdateMany(query1,
467+
bson.M{"$set": bson.M{
468+
"state": StatePending,
469+
"meta.dispatched": nil},
470+
}).Return(tt.error1)
471+
472+
query2 := bson.M{
473+
"state": StatePending,
474+
"$expr": bson.M{"$gte": bson.A{"$tries", "$maxtries"}},
475+
}
476+
477+
if tt.topic != "" {
478+
query2["topic"] = tt.topic
479+
}
480+
481+
dbMock.EXPECT().UpdateMany(query2,
482+
bson.M{"$set": bson.M{
483+
"state": StateError,
484+
"meta.completed": nowFunc()},
485+
}).Return(tt.error2)
486+
487+
err := q.Selfcare(tt.topic)
488+
489+
if tt.error1 != nil {
490+
assert.Equal(t, tt.error1, err)
491+
} else if tt.error2 != nil {
492+
assert.Equal(t, tt.error2, err)
493+
} else {
494+
assert.Equal(t, nil, err)
495+
}
496+
})
497+
}
498+
}

0 commit comments

Comments
 (0)