diff --git a/README.md b/README.md index f5c1d25..099e546 100644 --- a/README.md +++ b/README.md @@ -130,10 +130,10 @@ func main() { rets := make(chan string, taskN) // initial queue pool - q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error { + q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error { v, ok := m.(*job) if !ok { - if err := json.Unmarshal(m.Bytes(), &v); err != nil { + if err := json.Unmarshal(m.Payload(), &v); err != nil { return err } } @@ -207,10 +207,10 @@ func main() { nsq.WithChannel("foobar"), // concurrent job number nsq.WithMaxInFlight(10), - nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { + nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { v, ok := m.(*job) if !ok { - if err := json.Unmarshal(m.Bytes(), &v); err != nil { + if err := json.Unmarshal(m.Payload(), &v); err != nil { return err } } @@ -286,10 +286,10 @@ func main() { nats.WithAddr("127.0.0.1:4222"), nats.WithSubj("example"), nats.WithQueue("foobar"), - nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { + nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { v, ok := m.(*job) if !ok { - if err := json.Unmarshal(m.Bytes(), &v); err != nil { + if err := json.Unmarshal(m.Payload(), &v); err != nil { return err } } @@ -370,10 +370,10 @@ func main() { w := redisdb.NewWorker( redisdb.WithAddr("127.0.0.1:6379"), redisdb.WithChannel("foobar"), - redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { + redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { v, ok := m.(*job) if !ok { - if err := json.Unmarshal(m.Bytes(), &v); err != nil { + if err := json.Unmarshal(m.Payload(), &v); err != nil { return err } } diff --git a/_example/example02/main.go b/_example/example02/main.go index a8e6d24..29cae1e 100644 --- a/_example/example02/main.go +++ b/_example/example02/main.go @@ -30,10 +30,10 @@ func main() { rets := make(chan string, taskN) // initial queue pool - q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error { + q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error { v, ok := m.(*job) if !ok { - if err := json.Unmarshal(m.Bytes(), &v); err != nil { + if err := json.Unmarshal(m.Payload(), &v); err != nil { return err } } diff --git a/benchmark_test.go b/benchmark_test.go index 7faa36a..29836ce 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -12,8 +12,8 @@ import ( var count = 1 type testqueue interface { - Queue(task core.QueuedMessage) error - Request() (core.QueuedMessage, error) + Queue(task core.TaskMessage) error + Request() (core.TaskMessage, error) } func testQueue(b *testing.B, pool testqueue) { @@ -94,7 +94,7 @@ func BenchmarkQueue(b *testing.B) { // Payload: []byte(`{"timeout":3600000000000}`), // } // w := NewRing( -// WithFn(func(ctx context.Context, m core.QueuedMessage) error { +// WithFn(func(ctx context.Context, m core.TaskMessage) error { // return nil // }), // ) @@ -119,7 +119,7 @@ func BenchmarkRingWithTask(b *testing.B) { }, } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { return nil }), ) diff --git a/core/worker.go b/core/worker.go index 52dc9ae..282dda0 100644 --- a/core/worker.go +++ b/core/worker.go @@ -4,12 +4,12 @@ import ( "context" ) -// Worker represents a worker that processes queued messages. -// It provides methods to run the worker, shut it down, queue messages, and request messages from the queue. +// Worker represents an interface for a worker that processes tasks. +// It provides methods to run tasks, shut down the worker, queue tasks, and request tasks from the queue. type Worker interface { // Run starts the worker and processes the given task in the provided context. // It returns an error if the task cannot be processed. - Run(ctx context.Context, task QueuedMessage) error + Run(ctx context.Context, task TaskMessage) error // Shutdown stops the worker and performs any necessary cleanup. // It returns an error if the shutdown process fails. @@ -17,11 +17,11 @@ type Worker interface { // Queue adds a task to the worker's queue. // It returns an error if the task cannot be added to the queue. - Queue(task QueuedMessage) error + Queue(task TaskMessage) error // Request retrieves a task from the worker's queue. // It returns the queued message and an error if the retrieval fails. - Request() (QueuedMessage, error) + Request() (TaskMessage, error) } // QueuedMessage represents an interface for a message that can be queued. @@ -31,6 +31,8 @@ type QueuedMessage interface { Bytes() []byte } +// TaskMessage represents an interface for a task message that can be queued. +// It embeds the QueuedMessage interface and adds a method to retrieve the payload of the message. type TaskMessage interface { QueuedMessage Payload() []byte diff --git a/metric_test.go b/metric_test.go index 347afab..8873e72 100644 --- a/metric_test.go +++ b/metric_test.go @@ -13,8 +13,8 @@ import ( func TestMetricData(t *testing.T) { w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { - switch string(m.Bytes()) { + WithFn(func(ctx context.Context, m core.TaskMessage) error { + switch string(m.Payload()) { case "foo1": panic("missing something") case "foo2": diff --git a/mocks/mock_message.go b/mocks/mock_queued_message.go similarity index 92% rename from mocks/mock_message.go rename to mocks/mock_queued_message.go index ce6f18c..622660f 100644 --- a/mocks/mock_message.go +++ b/mocks/mock_queued_message.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage +// mockgen -package=mocks -destination=mock_queued_message.go github.com/golang-queue/queue/core QueuedMessage // // Package mocks is a generated GoMock package. diff --git a/mocks/mock_task_message.go b/mocks/mock_task_message.go new file mode 100644 index 0000000..d93e6a4 --- /dev/null +++ b/mocks/mock_task_message.go @@ -0,0 +1,67 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/golang-queue/queue/core (interfaces: TaskMessage) +// +// Generated by this command: +// +// mockgen -package=mocks -destination=mock_task_message.go github.com/golang-queue/queue/core TaskMessage +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockTaskMessage is a mock of TaskMessage interface. +type MockTaskMessage struct { + ctrl *gomock.Controller + recorder *MockTaskMessageMockRecorder +} + +// MockTaskMessageMockRecorder is the mock recorder for MockTaskMessage. +type MockTaskMessageMockRecorder struct { + mock *MockTaskMessage +} + +// NewMockTaskMessage creates a new mock instance. +func NewMockTaskMessage(ctrl *gomock.Controller) *MockTaskMessage { + mock := &MockTaskMessage{ctrl: ctrl} + mock.recorder = &MockTaskMessageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskMessage) EXPECT() *MockTaskMessageMockRecorder { + return m.recorder +} + +// Bytes mocks base method. +func (m *MockTaskMessage) Bytes() []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Bytes") + ret0, _ := ret[0].([]byte) + return ret0 +} + +// Bytes indicates an expected call of Bytes. +func (mr *MockTaskMessageMockRecorder) Bytes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bytes", reflect.TypeOf((*MockTaskMessage)(nil).Bytes)) +} + +// Payload mocks base method. +func (m *MockTaskMessage) Payload() []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Payload") + ret0, _ := ret[0].([]byte) + return ret0 +} + +// Payload indicates an expected call of Payload. +func (mr *MockTaskMessageMockRecorder) Payload() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Payload", reflect.TypeOf((*MockTaskMessage)(nil).Payload)) +} diff --git a/mocks/mock_worker.go b/mocks/mock_worker.go index 16c7c5d..b2aeec7 100644 --- a/mocks/mock_worker.go +++ b/mocks/mock_worker.go @@ -41,7 +41,7 @@ func (m *MockWorker) EXPECT() *MockWorkerMockRecorder { } // Queue mocks base method. -func (m *MockWorker) Queue(arg0 core.QueuedMessage) error { +func (m *MockWorker) Queue(arg0 core.TaskMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Queue", arg0) ret0, _ := ret[0].(error) @@ -55,10 +55,10 @@ func (mr *MockWorkerMockRecorder) Queue(arg0 any) *gomock.Call { } // Request mocks base method. -func (m *MockWorker) Request() (core.QueuedMessage, error) { +func (m *MockWorker) Request() (core.TaskMessage, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Request") - ret0, _ := ret[0].(core.QueuedMessage) + ret0, _ := ret[0].(core.TaskMessage) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -70,7 +70,7 @@ func (mr *MockWorkerMockRecorder) Request() *gomock.Call { } // Run mocks base method. -func (m *MockWorker) Run(arg0 context.Context, arg1 core.QueuedMessage) error { +func (m *MockWorker) Run(arg0 context.Context, arg1 core.TaskMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Run", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/mocks/mocks.go b/mocks/mocks.go index e170390..3af0d53 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -1,4 +1,5 @@ package mocks //go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker -//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage +//go:generate mockgen -package=mocks -destination=mock_queued_message.go github.com/golang-queue/queue/core QueuedMessage +//go:generate mockgen -package=mocks -destination=mock_task_message.go github.com/golang-queue/queue/core TaskMessage diff --git a/options.go b/options.go index 0690d17..840a681 100644 --- a/options.go +++ b/options.go @@ -11,7 +11,7 @@ var ( defaultCapacity = 0 defaultWorkerCount = int64(runtime.NumCPU()) defaultNewLogger = NewLogger() - defaultFn = func(context.Context, core.QueuedMessage) error { return nil } + defaultFn = func(context.Context, core.TaskMessage) error { return nil } defaultMetric = NewMetric() ) @@ -67,7 +67,7 @@ func WithWorker(w core.Worker) Option { } // WithFn set custom job function -func WithFn(fn func(context.Context, core.QueuedMessage) error) Option { +func WithFn(fn func(context.Context, core.TaskMessage) error) Option { return OptionFunc(func(q *Options) { q.fn = fn }) @@ -86,7 +86,7 @@ type Options struct { logger Logger queueSize int worker core.Worker - fn func(context.Context, core.QueuedMessage) error + fn func(context.Context, core.TaskMessage) error afterFn func() metric Metric } diff --git a/queue_test.go b/queue_test.go index e99d4b6..81da14b 100644 --- a/queue_test.go +++ b/queue_test.go @@ -28,6 +28,10 @@ func (m mockMessage) Bytes() []byte { return bytesconv.StrToBytes(m.message) } +func (m mockMessage) Payload() []byte { + return bytesconv.StrToBytes(m.message) +} + func TestNewQueueWithZeroWorker(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() @@ -61,8 +65,9 @@ func TestNewQueueWithDefaultWorker(t *testing.T) { assert.Nil(t, q) w := mocks.NewMockWorker(controller) - m := mocks.NewMockQueuedMessage(controller) + m := mocks.NewMockTaskMessage(controller) m.EXPECT().Bytes().Return([]byte("test")).AnyTimes() + m.EXPECT().Payload().Return([]byte("test")).AnyTimes() w.EXPECT().Shutdown().Return(nil) w.EXPECT().Request().Return(m, nil).AnyTimes() w.EXPECT().Run(context.Background(), m).Return(nil).AnyTimes() @@ -83,7 +88,7 @@ func TestHandleTimeout(t *testing.T) { Body: []byte("foo"), } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(200 * time.Millisecond) return nil }), @@ -115,7 +120,7 @@ func TestJobComplete(t *testing.T) { Body: []byte("foo"), } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { return errors.New("job completed") }), ) @@ -136,7 +141,7 @@ func TestJobComplete(t *testing.T) { } w = NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(200 * time.Millisecond) return errors.New("job completed") }), @@ -196,11 +201,11 @@ func TestMockWorkerAndMessage(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - m := mocks.NewMockQueuedMessage(controller) + m := mocks.NewMockTaskMessage(controller) w := mocks.NewMockWorker(controller) w.EXPECT().Shutdown().Return(nil) - w.EXPECT().Request().DoAndReturn(func() (core.QueuedMessage, error) { + w.EXPECT().Request().DoAndReturn(func() (core.TaskMessage, error) { return m, errors.New("nil") }) diff --git a/ring.go b/ring.go index a4f9e47..233d753 100644 --- a/ring.go +++ b/ring.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "github.com/golang-queue/queue/core" - "github.com/golang-queue/queue/job" ) var _ core.Worker = (*Ring)(nil) @@ -14,8 +13,8 @@ var _ core.Worker = (*Ring)(nil) // Ring for simple queue using buffer channel type Ring struct { sync.Mutex - taskQueue []core.QueuedMessage - runFunc func(context.Context, core.QueuedMessage) error + taskQueue []core.TaskMessage + runFunc func(context.Context, core.TaskMessage) error capacity int count int head int @@ -26,21 +25,21 @@ type Ring struct { stopFlag int32 } -type Data struct { - Payload []byte `json:"payload"` -} +// type Data struct { +// Payload []byte `json:"payload"` +// } -func (d *Data) Bytes() []byte { - return d.Payload -} +// func (d *Data) Bytes() []byte { +// return d.Payload +// } // Run to execute new task -func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error { - v, _ := task.(*job.Message) - data := &Data{ - Payload: v.Body, - } - return s.runFunc(ctx, data) +func (s *Ring) Run(ctx context.Context, task core.TaskMessage) error { + // v, _ := task.(*job.Message) + // data := &Data{ + // Payload: v.Body, + // } + return s.runFunc(ctx, task) } // Shutdown the worker @@ -61,7 +60,7 @@ func (s *Ring) Shutdown() error { } // Queue send task to the buffer channel -func (s *Ring) Queue(task core.QueuedMessage) error { //nolint:stylecheck +func (s *Ring) Queue(task core.TaskMessage) error { //nolint:stylecheck if atomic.LoadInt32(&s.stopFlag) == 1 { return ErrQueueShutdown } @@ -82,7 +81,7 @@ func (s *Ring) Queue(task core.QueuedMessage) error { //nolint:stylecheck } // Request a new task from channel -func (s *Ring) Request() (core.QueuedMessage, error) { +func (s *Ring) Request() (core.TaskMessage, error) { if atomic.LoadInt32(&s.stopFlag) == 1 && s.count == 0 { select { case s.exit <- struct{}{}: @@ -109,7 +108,7 @@ func (s *Ring) Request() (core.QueuedMessage, error) { } func (q *Ring) resize(n int) { - nodes := make([]core.QueuedMessage, n) + nodes := make([]core.TaskMessage, n) if q.head < q.tail { copy(nodes, q.taskQueue[q.head:q.tail]) } else { @@ -126,7 +125,7 @@ func (q *Ring) resize(n int) { func NewRing(opts ...Option) *Ring { o := NewOptions(opts...) w := &Ring{ - taskQueue: make([]core.QueuedMessage, 2), + taskQueue: make([]core.TaskMessage, 2), capacity: o.queueSize, exit: make(chan struct{}), logger: o.logger, diff --git a/ring_test.go b/ring_test.go index aa93fd7..6c063eb 100644 --- a/ring_test.go +++ b/ring_test.go @@ -33,7 +33,7 @@ func TestCustomFuncAndWait(t *testing.T) { message: "foo", } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(500 * time.Millisecond) return nil }), @@ -82,11 +82,11 @@ func TestJobReachTimeout(t *testing.T) { message: "foo", } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { for { select { case <-ctx.Done(): - log.Println("get data:", string(m.Bytes())) + log.Println("get data:", string(m.Payload())) if errors.Is(ctx.Err(), context.Canceled) { log.Println("queue has been shutdown and cancel the job") } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -116,11 +116,11 @@ func TestCancelJobAfterShutdown(t *testing.T) { } w := NewRing( WithLogger(NewEmptyLogger()), - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { for { select { case <-ctx.Done(): - log.Println("get data:", string(m.Bytes())) + log.Println("get data:", string(m.Payload())) if errors.Is(ctx.Err(), context.Canceled) { log.Println("queue has been shutdown and cancel the job") } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -149,18 +149,18 @@ func TestCancelJobAfterShutdown(t *testing.T) { func TestGoroutineLeak(t *testing.T) { w := NewRing( WithLogger(NewLogger()), - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { for { select { case <-ctx.Done(): if errors.Is(ctx.Err(), context.Canceled) { - log.Println("queue has been shutdown and cancel the job: " + string(m.Bytes())) + log.Println("queue has been shutdown and cancel the job: " + string(m.Payload())) } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { - log.Println("job deadline exceeded: " + string(m.Bytes())) + log.Println("job deadline exceeded: " + string(m.Payload())) } return nil default: - log.Println("get data:", string(m.Bytes())) + log.Println("get data:", string(m.Payload())) time.Sleep(50 * time.Millisecond) return nil } @@ -192,7 +192,7 @@ func TestGoroutinePanic(t *testing.T) { message: "foo", } w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { panic("missing something") }), ) @@ -210,7 +210,7 @@ func TestGoroutinePanic(t *testing.T) { func TestIncreaseWorkerCount(t *testing.T) { w := NewRing( WithLogger(NewEmptyLogger()), - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(500 * time.Millisecond) return nil }), @@ -240,7 +240,7 @@ func TestIncreaseWorkerCount(t *testing.T) { func TestDecreaseWorkerCount(t *testing.T) { w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(100 * time.Millisecond) return nil }), @@ -274,10 +274,10 @@ func TestHandleAllJobBeforeShutdownRing(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - m := mocks.NewMockQueuedMessage(controller) + m := mocks.NewMockTaskMessage(controller) w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(10 * time.Millisecond) return nil }), @@ -307,15 +307,16 @@ func TestHandleAllJobBeforeShutdownRingInQueue(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - m := mocks.NewMockQueuedMessage(controller) + m := mocks.NewMockTaskMessage(controller) m.EXPECT().Bytes().Return([]byte("test")).AnyTimes() + m.EXPECT().Payload().Return([]byte("test")).AnyTimes() messages := make(chan string, 10) w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { time.Sleep(10 * time.Millisecond) - messages <- string(m.Bytes()) + messages <- string(m.Payload()) return nil }), ) @@ -347,13 +348,13 @@ func TestRetryCountWithNewMessage(t *testing.T) { count := 1 w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { if count%3 != 0 { count++ return errors.New("count not correct") } close(keep) - messages <- string(m.Bytes()) + messages <- string(m.Payload()) return nil }), ) @@ -464,12 +465,12 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) { count := 1 w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { if count%3 != 0 { count++ return errors.New("count not correct") } - messages <- string(m.Bytes()) + messages <- string(m.Payload()) return nil }), ) @@ -498,7 +499,7 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) { func TestErrNoTaskInQueue(t *testing.T) { w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { + WithFn(func(ctx context.Context, m core.TaskMessage) error { return nil }), )