Skip to content

Commit 12834dc

Browse files
committed
refactor: refactor queue management and improve task handling
- Add the `log` package to imports - Remove `github.com/appleboy/graceful` from imports - Alias `github.com/golang-queue/rabbitmq` as `rabbitmq` in imports - Reduce `taskN` from 10000 to 100 - Remove the use of `graceful.NewManager()` - Remove `rabbitmq.WithExchangeType(*exchangeType)` from the worker configuration - Remove the `time.Sleep(500 * time.Millisecond)` call in the worker function - Change queue initialization to use `queue.NewQueue` with a worker count of 5 - Add error handling for queue initialization - Add code to start the queue with five workers - Replace `graceful` job management with direct task assignment to the queue - Add a loop to wait until all tasks are done, printing messages and sleeping for 50 milliseconds between each Signed-off-by: appleboy <[email protected]>
1 parent 6c0e5ed commit 12834dc

File tree

1 file changed

+28
-30
lines changed

1 file changed

+28
-30
lines changed

README.md

+28-30
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import (
2323
"encoding/json"
2424
"flag"
2525
"fmt"
26+
"log"
2627
"time"
2728

28-
"github.com/appleboy/graceful"
2929
"github.com/golang-queue/queue"
3030
"github.com/golang-queue/queue/core"
31-
"github.com/golang-queue/rabbitmq"
31+
rabbitmq "github.com/golang-queue/rabbitmq"
3232
)
3333

3434
type job struct {
@@ -56,57 +56,55 @@ func init() {
5656
}
5757

5858
func main() {
59-
taskN := 10000
59+
taskN := 100
6060
rets := make(chan string, taskN)
6161

62-
m := graceful.NewManager()
63-
6462
// define the worker
6563
w := rabbitmq.NewWorker(
6664
rabbitmq.WithAddr(*uri),
6765
rabbitmq.WithQueue(*q),
6866
rabbitmq.WithExchangeName(*exchange),
69-
rabbitmq.WithExchangeType(*exchangeType),
7067
rabbitmq.WithRoutingKey(*bindingKey),
7168
rabbitmq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
7269
var v *job
7370
if err := json.Unmarshal(m.Payload(), &v); err != nil {
7471
return err
7572
}
7673
rets <- v.Message
77-
time.Sleep(500 * time.Millisecond)
7874
return nil
7975
}),
8076
)
77+
8178
// define the queue
82-
q := queue.NewPool(
83-
2,
79+
q, err := queue.NewQueue(
80+
queue.WithWorkerCount(5),
8481
queue.WithWorker(w),
8582
)
83+
if err != nil {
84+
log.Fatal(err)
85+
}
8686

87-
m.AddRunningJob(func(ctx context.Context) error {
88-
for {
89-
select {
90-
case <-ctx.Done():
91-
select {
92-
case m := <-rets:
93-
fmt.Println("message:", m)
94-
default:
95-
}
96-
return nil
97-
case m := <-rets:
98-
fmt.Println("message:", m)
99-
time.Sleep(50 * time.Millisecond)
87+
// start the five worker
88+
q.Start()
89+
90+
// assign tasks in queue
91+
for i := 0; i < taskN; i++ {
92+
go func(i int) {
93+
if err := q.Queue(&job{
94+
Message: fmt.Sprintf("handle the job: %d", i+1),
95+
}); err != nil {
96+
log.Fatal(err)
10097
}
101-
}
102-
})
98+
}(i)
99+
}
103100

104-
m.AddShutdownJob(func() error {
105-
// shutdown the service and notify all the worker
106-
q.Release()
107-
return nil
108-
})
101+
// wait until all tasks done
102+
for i := 0; i < taskN; i++ {
103+
fmt.Println("message:", <-rets)
104+
time.Sleep(50 * time.Millisecond)
105+
}
109106

110-
<-m.Done()
107+
// shutdown the service and notify all the worker
108+
q.Release()
111109
}
112110
```

0 commit comments

Comments
 (0)