Skip to content

refactor: refactor QueuedMessage to TaskMessage across the codebase #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func main() {
rets := make(chan string, taskN)

// initial queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -207,10 +207,10 @@ func main() {
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -286,10 +286,10 @@ func main() {
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -370,10 +370,10 @@ func main() {
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions _example/example02/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func main() {
rets := make(chan string, taskN)

// initial queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
var count = 1

type testqueue interface {
Queue(task core.QueuedMessage) error
Request() (core.QueuedMessage, error)
Queue(task core.TaskMessage) error
Request() (core.TaskMessage, error)
}

func testQueue(b *testing.B, pool testqueue) {
Expand Down Expand Up @@ -94,7 +94,7 @@ func BenchmarkQueue(b *testing.B) {
// Payload: []byte(`{"timeout":3600000000000}`),
// }
// w := NewRing(
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
// WithFn(func(ctx context.Context, m core.TaskMessage) error {
// return nil
// }),
// )
Expand All @@ -119,7 +119,7 @@ func BenchmarkRingWithTask(b *testing.B) {
},
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
return nil
}),
)
Expand Down
12 changes: 7 additions & 5 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ import (
"context"
)

// Worker represents a worker that processes queued messages.
// It provides methods to run the worker, shut it down, queue messages, and request messages from the queue.
// Worker represents an interface for a worker that processes tasks.
// It provides methods to run tasks, shut down the worker, queue tasks, and request tasks from the queue.
type Worker interface {
// Run starts the worker and processes the given task in the provided context.
// It returns an error if the task cannot be processed.
Run(ctx context.Context, task QueuedMessage) error
Run(ctx context.Context, task TaskMessage) error

// Shutdown stops the worker and performs any necessary cleanup.
// It returns an error if the shutdown process fails.
Shutdown() error

// Queue adds a task to the worker's queue.
// It returns an error if the task cannot be added to the queue.
Queue(task QueuedMessage) error
Queue(task TaskMessage) error

// Request retrieves a task from the worker's queue.
// It returns the queued message and an error if the retrieval fails.
Request() (QueuedMessage, error)
Request() (TaskMessage, error)
}

// QueuedMessage represents an interface for a message that can be queued.
Expand All @@ -31,6 +31,8 @@ type QueuedMessage interface {
Bytes() []byte
}

// TaskMessage represents an interface for a task message that can be queued.
// It embeds the QueuedMessage interface and adds a method to retrieve the payload of the message.
type TaskMessage interface {
QueuedMessage
Payload() []byte
Expand Down
4 changes: 2 additions & 2 deletions metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

func TestMetricData(t *testing.T) {
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
switch string(m.Bytes()) {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
switch string(m.Payload()) {
case "foo1":
panic("missing something")
case "foo2":
Expand Down
2 changes: 1 addition & 1 deletion mocks/mock_message.go → mocks/mock_queued_message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions mocks/mock_task_message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions mocks/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion mocks/mocks.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package mocks

//go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker
//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage
//go:generate mockgen -package=mocks -destination=mock_queued_message.go github.com/golang-queue/queue/core QueuedMessage
//go:generate mockgen -package=mocks -destination=mock_task_message.go github.com/golang-queue/queue/core TaskMessage
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var (
defaultCapacity = 0
defaultWorkerCount = int64(runtime.NumCPU())
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
defaultFn = func(context.Context, core.TaskMessage) error { return nil }
defaultMetric = NewMetric()
)

Expand Down Expand Up @@ -67,7 +67,7 @@ func WithWorker(w core.Worker) Option {
}

// WithFn set custom job function
func WithFn(fn func(context.Context, core.QueuedMessage) error) Option {
func WithFn(fn func(context.Context, core.TaskMessage) error) Option {
return OptionFunc(func(q *Options) {
q.fn = fn
})
Expand All @@ -86,7 +86,7 @@ type Options struct {
logger Logger
queueSize int
worker core.Worker
fn func(context.Context, core.QueuedMessage) error
fn func(context.Context, core.TaskMessage) error
afterFn func()
metric Metric
}
Expand Down
17 changes: 11 additions & 6 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (m mockMessage) Bytes() []byte {
return bytesconv.StrToBytes(m.message)
}

func (m mockMessage) Payload() []byte {
return bytesconv.StrToBytes(m.message)
}

func TestNewQueueWithZeroWorker(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
Expand Down Expand Up @@ -61,8 +65,9 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
assert.Nil(t, q)

w := mocks.NewMockWorker(controller)
m := mocks.NewMockQueuedMessage(controller)
m := mocks.NewMockTaskMessage(controller)
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()
m.EXPECT().Payload().Return([]byte("test")).AnyTimes()
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().Return(m, nil).AnyTimes()
w.EXPECT().Run(context.Background(), m).Return(nil).AnyTimes()
Expand All @@ -83,7 +88,7 @@ func TestHandleTimeout(t *testing.T) {
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -115,7 +120,7 @@ func TestJobComplete(t *testing.T) {
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
return errors.New("job completed")
}),
)
Expand All @@ -136,7 +141,7 @@ func TestJobComplete(t *testing.T) {
}

w = NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
Expand Down Expand Up @@ -196,11 +201,11 @@ func TestMockWorkerAndMessage(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

m := mocks.NewMockQueuedMessage(controller)
m := mocks.NewMockTaskMessage(controller)

w := mocks.NewMockWorker(controller)
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().DoAndReturn(func() (core.QueuedMessage, error) {
w.EXPECT().Request().DoAndReturn(func() (core.TaskMessage, error) {
return m, errors.New("nil")
})

Expand Down
Loading
Loading