@@ -16,7 +16,19 @@ import (
16
16
17
17
var _ core.Worker = (* Worker )(nil )
18
18
19
- // Worker for NSQ
19
+ /*
20
+ Worker struct implements the core.Worker interface for RabbitMQ.
21
+ It manages the AMQP connection, channel, and task consumption.
22
+ Fields:
23
+ - conn: AMQP connection to RabbitMQ server.
24
+ - channel: AMQP channel for communication.
25
+ - stop: Channel to signal worker shutdown.
26
+ - stopFlag: Atomic flag to indicate if the worker is stopped.
27
+ - stopOnce: Ensures shutdown logic runs only once.
28
+ - startOnce: Ensures consumer initialization runs only once.
29
+ - opts: Configuration options for the worker.
30
+ - tasks: Channel for receiving AMQP deliveries (tasks).
31
+ */
20
32
type Worker struct {
21
33
conn * amqp.Connection
22
34
channel * amqp.Channel
@@ -28,7 +40,17 @@ type Worker struct {
28
40
tasks <- chan amqp.Delivery
29
41
}
30
42
31
- // NewWorker for struc
43
+ /*
44
+ NewWorker creates and initializes a new Worker instance with the provided options.
45
+ It establishes a connection to RabbitMQ, sets up the channel, and declares the exchange.
46
+ If any step fails, it logs a fatal error and terminates the process.
47
+
48
+ Parameters:
49
+ - opts: Variadic list of Option functions to configure the worker.
50
+
51
+ Returns:
52
+ - Pointer to the initialized Worker.
53
+ */
32
54
func NewWorker (opts ... Option ) * Worker {
33
55
var err error
34
56
w := & Worker {
@@ -62,6 +84,14 @@ func NewWorker(opts ...Option) *Worker {
62
84
return w
63
85
}
64
86
87
+ /*
88
+ startConsumer initializes the consumer for the worker.
89
+ It declares the queue, binds it to the exchange, and starts consuming messages.
90
+ This method is safe to call multiple times but will only execute once due to sync.Once.
91
+
92
+ Returns:
93
+ - error: Any error encountered during initialization, or nil on success.
94
+ */
65
95
func (w * Worker ) startConsumer () error {
66
96
var initErr error
67
97
w .startOnce .Do (func () {
@@ -104,12 +134,29 @@ func (w *Worker) startConsumer() error {
104
134
return initErr
105
135
}
106
136
107
- // Run start the worker
137
+ /*
138
+ Run executes the worker's task processing function.
139
+ It delegates the actual task handling to the configured runFunc.
140
+
141
+ Parameters:
142
+ - ctx: Context for cancellation and timeout.
143
+ - task: The task message to process.
144
+
145
+ Returns:
146
+ - error: Any error returned by the runFunc.
147
+ */
108
148
func (w * Worker ) Run (ctx context.Context , task core.TaskMessage ) error {
109
149
return w .opts .runFunc (ctx , task )
110
150
}
111
151
112
- // Shutdown worker
152
+ /*
153
+ Shutdown gracefully stops the worker.
154
+ It ensures shutdown logic runs only once, cancels the consumer, and closes the AMQP connection.
155
+ If the worker is already stopped, it returns queue.ErrQueueShutdown.
156
+
157
+ Returns:
158
+ - error: Any error encountered during shutdown, or nil on success.
159
+ */
113
160
func (w * Worker ) Shutdown () (err error ) {
114
161
if ! atomic .CompareAndSwapInt32 (& w .stopFlag , 0 , 1 ) {
115
162
return queue .ErrQueueShutdown
@@ -128,7 +175,16 @@ func (w *Worker) Shutdown() (err error) {
128
175
return err
129
176
}
130
177
131
- // Queue send notification to queue
178
+ /*
179
+ Queue publishes a new task message to the RabbitMQ exchange.
180
+ If the worker is stopped, it returns queue.ErrQueueShutdown.
181
+
182
+ Parameters:
183
+ - job: The task message to be published.
184
+
185
+ Returns:
186
+ - error: Any error encountered during publishing, or nil on success.
187
+ */
132
188
func (w * Worker ) Queue (job core.TaskMessage ) error {
133
189
if atomic .LoadInt32 (& w .stopFlag ) == 1 {
134
190
return queue .ErrQueueShutdown
@@ -152,7 +208,15 @@ func (w *Worker) Queue(job core.TaskMessage) error {
152
208
return err
153
209
}
154
210
155
- // Request a new task
211
+ /*
212
+ Request retrieves a new task message from the queue.
213
+ It starts the consumer if not already started, waits for a message, and unmarshals it into a job.Message.
214
+ If no message is received within a timeout, it returns queue.ErrNoTaskInQueue.
215
+
216
+ Returns:
217
+ - core.TaskMessage: The received task message, or nil if none.
218
+ - error: Any error encountered, or queue.ErrNoTaskInQueue if no task is available.
219
+ */
156
220
func (w * Worker ) Request () (core.TaskMessage , error ) {
157
221
if err := w .startConsumer (); err != nil {
158
222
return nil , err
0 commit comments