-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.go
129 lines (102 loc) · 3.7 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package jobqueue
import (
"context"
"time"
"github.com/domonda/go-types/uu"
)
var (
defaultService Service = ServiceWithError(ErrNotInitialized)
serviceCtxKey int
)
func SetDefaultService(service Service) {
if service == nil {
panic("jobqueue.SetDefaultService(nil)")
}
defaultService = service
}
func ContextWithService(ctx context.Context, service Service) context.Context {
return context.WithValue(ctx, &serviceCtxKey, service)
}
func ServiceFromContextOrNil(ctx context.Context) Service {
s, _ := ctx.Value(&serviceCtxKey).(Service)
return s
}
// GetService returns a Service from the context that was
// added using ContextWithService or the default service
// that is configurable with SetDefaultService.
func GetService(ctx context.Context) Service {
if s := ServiceFromContextOrNil(ctx); s != nil {
return s
}
return defaultService
}
// Close the default service
func Close() error {
if defaultService == nil {
return nil
}
return defaultService.Close()
}
type Service interface {
AddListener(context.Context, ServiceListener) error
AddJob(ctx context.Context, job *Job) error
GetJob(ctx context.Context, jobID uu.ID) (*Job, error)
// DeleteJob deletes a job from the queue.
DeleteJob(ctx context.Context, jobID uu.ID) error
// ResetJob resets the processing state of a job in the queue
// so that the job is ready to be re-processed.
ResetJob(ctx context.Context, jobID uu.ID) error
// ResetJobs resets the processing state of multiple jobs in the queue
// so that they are ready to be re-processed.
ResetJobs(ctx context.Context, jobIDs uu.IDs) error
AddJobBundle(ctx context.Context, jobBundle *JobBundle) error
GetJobBundle(ctx context.Context, jobBundleID uu.ID) (*JobBundle, error)
DeleteJobBundle(ctx context.Context, jobBundleID uu.ID) error
GetStatus(context.Context) (*Status, error)
GetAllJobsToDo(context.Context) ([]*Job, error)
GetAllJobsStartedBefore(ctx context.Context, since time.Time) ([]*Job, error)
GetAllJobsWithErrors(context.Context) ([]*Job, error)
DeleteFinishedJobs(ctx context.Context) error
Close() error
}
func Add(ctx context.Context, job *Job) error {
return GetService(ctx).AddJob(ctx, job)
}
func GetJob(ctx context.Context, jobID uu.ID) (*Job, error) {
return GetService(ctx).GetJob(ctx, jobID)
}
func DeleteFinishedJobs(ctx context.Context) error {
return GetService(ctx).DeleteFinishedJobs(ctx)
}
// ResetJob resets the processing state of a job in the queue
// so that the job is ready to be re-processed.
func ResetJob(ctx context.Context, jobID uu.ID) error {
return GetService(ctx).ResetJob(ctx, jobID)
}
// ResetJobs resets the processing state of multiple jobs in the queue
// so that they are ready to be re-processed.
func ResetJobs(ctx context.Context, jobIDs uu.IDSlice) error {
return GetService(ctx).ResetJobs(ctx, jobIDs)
}
// DeleteJob deletes a job from the queue.
func DeleteJob(ctx context.Context, jobID uu.ID) error {
return GetService(ctx).DeleteJob(ctx, jobID)
}
func GetAllJobsToDo(ctx context.Context) (jobs []*Job, err error) {
return GetService(ctx).GetAllJobsToDo(ctx)
}
func GetAllJobsStartedBefore(ctx context.Context, since time.Time) (jobs []*Job, err error) {
return GetService(ctx).GetAllJobsStartedBefore(ctx, since)
}
func GetAllJobsWithErrors(ctx context.Context) (jobs []*Job, err error) {
return GetService(ctx).GetAllJobsWithErrors(ctx)
}
func GetStatus(ctx context.Context) (status *Status, err error) {
return GetService(ctx).GetStatus(ctx)
}
func AddBundle(ctx context.Context, jobBundle *JobBundle) (err error) {
return GetService(ctx).AddJobBundle(ctx, jobBundle)
}
func GetJobBundle(ctx context.Context, jobBundleID uu.ID) (jobBundle *JobBundle, err error) {
return GetService(ctx).GetJobBundle(ctx, jobBundleID)
}