-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoptions.go
262 lines (217 loc) · 5.81 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
package rabbitmq
import (
"context"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
/*
Package rabbitmq provides configuration options and helper functions
for setting up and customizing RabbitMQ workers and queues in the golang-queue system.
This file defines the available exchange types, the options struct, and a set of functional
options for flexible configuration.
*/
/*
Predefined RabbitMQ exchange types for use in configuration.
- ExchangeDirect: Direct exchange type.
- ExchangeFanout: Fanout exchange type.
- ExchangeTopic: Topic exchange type.
- ExchangeHeaders: Headers exchange type.
*/
const (
ExchangeDirect = "direct"
ExchangeFanout = "fanout"
ExchangeTopic = "topic"
ExchangeHeaders = "headers"
)
/*
isVaildExchange checks if the provided exchange name is one of the supported types.
Parameters:
- name: The exchange type name to validate.
Returns:
- bool: true if the exchange type is valid, false otherwise.
*/
func isVaildExchange(name string) bool {
switch name {
case ExchangeDirect, ExchangeFanout, ExchangeTopic, ExchangeHeaders:
return true
default:
return false
}
}
/*
Option is a functional option type for configuring the options struct.
It allows for flexible and composable configuration of RabbitMQ workers and queues.
*/
type Option func(*options)
/*
options struct holds all configuration parameters for a RabbitMQ worker or queue.
Fields:
- runFunc: The function to execute for each task.
- logger: Logger instance for logging.
- addr: AMQP server URI.
- queue: Name of the queue to use.
- tag: Consumer tag for identification.
- exchangeName: Name of the AMQP exchange.
- exchangeType: Type of the AMQP exchange (direct, fanout, topic, headers).
- autoAck: Whether to enable automatic message acknowledgment.
- routingKey: AMQP routing key for message delivery.
*/
type options struct {
runFunc func(context.Context, core.TaskMessage) error
logger queue.Logger
addr string
queue string
tag string
exchangeName string // Durable AMQP exchange name
exchangeType string // Exchange Types: Direct, Fanout, Topic and Headers
autoAck bool
routingKey string // AMQP routing key
}
/*
WithAddr sets the AMQP server URI.
Parameters:
- addr: The AMQP URI to connect to.
Returns:
- Option: Functional option to set the address.
*/
func WithAddr(addr string) Option {
return func(w *options) {
w.addr = addr
}
}
/*
WithExchangeName sets the name of the AMQP exchange.
Parameters:
- val: The exchange name.
Returns:
- Option: Functional option to set the exchange name.
Exchanges are AMQP 0-9-1 entities where messages are sent to.
Exchanges take a message and route it into zero or more queues.
*/
func WithExchangeName(val string) Option {
return func(w *options) {
w.exchangeName = val
}
}
/*
WithExchangeType sets the type of the AMQP exchange.
Parameters:
- val: The exchange type (direct, fanout, topic, headers).
Returns:
- Option: Functional option to set the exchange type.
The routing algorithm used depends on the exchange type and rules called bindings.
AMQP 0-9-1 brokers provide four exchange types:
- Direct exchange (Empty string) and amq.direct
- Fanout exchange amq.fanout
- Topic exchange amq.topic
- Headers exchange amq.match (and amq.headers in RabbitMQ)
*/
func WithExchangeType(val string) Option {
return func(w *options) {
w.exchangeType = val
}
}
/*
WithRoutingKey sets the AMQP routing key.
Parameters:
- val: The routing key.
Returns:
- Option: Functional option to set the routing key.
*/
func WithRoutingKey(val string) Option {
return func(w *options) {
w.routingKey = val
}
}
/*
WithTag sets the consumer tag for the worker.
Parameters:
- val: The consumer tag.
Returns:
- Option: Functional option to set the tag.
*/
func WithTag(val string) Option {
return func(w *options) {
w.tag = val
}
}
/*
WithAutoAck enables or disables automatic message acknowledgment.
Parameters:
- val: true to enable auto-ack, false to disable.
Returns:
- Option: Functional option to set autoAck.
*/
func WithAutoAck(val bool) Option {
return func(w *options) {
w.autoAck = val
}
}
/*
WithQueue sets the name of the queue to use.
Parameters:
- val: The queue name.
Returns:
- Option: Functional option to set the queue name.
*/
func WithQueue(val string) Option {
return func(w *options) {
w.queue = val
}
}
/*
WithRunFunc sets the function to execute for each task.
Parameters:
- fn: The function to run for each task message.
Returns:
- Option: Functional option to set the run function.
*/
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
return func(w *options) {
w.runFunc = fn
}
}
/*
WithLogger sets a custom logger for the worker or queue.
Parameters:
- l: The logger instance.
Returns:
- Option: Functional option to set the logger.
*/
func WithLogger(l queue.Logger) Option {
return func(w *options) {
w.logger = l
}
}
/*
newOptions creates a new options struct with default values,
then applies any provided functional options to override defaults.
Parameters:
- opts: Variadic list of Option functions to customize the configuration.
Returns:
- options: The fully configured options struct.
*/
func newOptions(opts ...Option) options {
defaultOpts := options{
addr: "amqp://guest:guest@localhost:5672/",
queue: "golang-queue",
tag: "golang-queue",
exchangeName: "test-exchange",
exchangeType: ExchangeDirect,
routingKey: "test-key",
logger: queue.NewLogger(),
autoAck: false,
runFunc: func(context.Context, core.TaskMessage) error {
return nil
},
}
// Apply each provided option to override defaults
for _, opt := range opts {
opt(&defaultOpts)
}
// Validate the exchange type
if !isVaildExchange(defaultOpts.exchangeType) {
defaultOpts.logger.Fatal("invaild exchange type: ", defaultOpts.exchangeType)
}
return defaultOpts
}