@@ -20,8 +20,6 @@ import (
20
20
"go.uber.org/goleak"
21
21
)
22
22
23
- var host = "127.0.0.1"
24
-
25
23
func TestMain (m * testing.M ) {
26
24
goleak .VerifyTestMain (m )
27
25
}
@@ -86,8 +84,11 @@ func TestNSQDefaultFlow(t *testing.T) {
86
84
}
87
85
88
86
func TestNSQShutdown (t * testing.T ) {
87
+ ctx := context .Background ()
88
+ natsC , endpoint := setupNSQContainer (ctx , t )
89
+ defer testcontainers .CleanupContainer (t , natsC )
89
90
w := NewWorker (
90
- WithAddr (host + ":4150" ),
91
+ WithAddr (endpoint ),
91
92
WithTopic ("test2" ),
92
93
)
93
94
q , err := queue .NewQueue (
@@ -105,11 +106,14 @@ func TestNSQShutdown(t *testing.T) {
105
106
}
106
107
107
108
func TestNSQCustomFuncAndWait (t * testing.T ) {
109
+ ctx := context .Background ()
110
+ natsC , endpoint := setupNSQContainer (ctx , t )
111
+ defer testcontainers .CleanupContainer (t , natsC )
108
112
m := & mockMessage {
109
113
Message : "foo" ,
110
114
}
111
115
w := NewWorker (
112
- WithAddr (host + ":4150" ),
116
+ WithAddr (endpoint ),
113
117
WithTopic ("test3" ),
114
118
WithMaxInFlight (10 ),
115
119
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
@@ -134,11 +138,14 @@ func TestNSQCustomFuncAndWait(t *testing.T) {
134
138
}
135
139
136
140
func TestEnqueueJobAfterShutdown (t * testing.T ) {
141
+ ctx := context .Background ()
142
+ natsC , endpoint := setupNSQContainer (ctx , t )
143
+ defer testcontainers .CleanupContainer (t , natsC )
137
144
m := mockMessage {
138
145
Message : "foo" ,
139
146
}
140
147
w := NewWorker (
141
- WithAddr (host + ":4150" ),
148
+ WithAddr (endpoint ),
142
149
)
143
150
q , err := queue .NewQueue (
144
151
queue .WithWorker (w ),
@@ -156,11 +163,14 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
156
163
}
157
164
158
165
func TestJobReachTimeout (t * testing.T ) {
166
+ ctx := context .Background ()
167
+ natsC , endpoint := setupNSQContainer (ctx , t )
168
+ defer testcontainers .CleanupContainer (t , natsC )
159
169
m := mockMessage {
160
170
Message : "foo" ,
161
171
}
162
172
w := NewWorker (
163
- WithAddr (host + ":4150" ),
173
+ WithAddr (endpoint ),
164
174
WithTopic ("timeout" ),
165
175
WithMaxInFlight (2 ),
166
176
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
@@ -195,11 +205,14 @@ func TestJobReachTimeout(t *testing.T) {
195
205
}
196
206
197
207
func TestCancelJobAfterShutdown (t * testing.T ) {
208
+ ctx := context .Background ()
209
+ natsC , endpoint := setupNSQContainer (ctx , t )
210
+ defer testcontainers .CleanupContainer (t , natsC )
198
211
m := mockMessage {
199
212
Message : "test" ,
200
213
}
201
214
w := NewWorker (
202
- WithAddr (host + ":4150" ),
215
+ WithAddr (endpoint ),
203
216
WithTopic ("cancel" ),
204
217
WithLogger (queue .NewLogger ()),
205
218
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
@@ -234,11 +247,14 @@ func TestCancelJobAfterShutdown(t *testing.T) {
234
247
}
235
248
236
249
func TestGoroutineLeak (t * testing.T ) {
250
+ ctx := context .Background ()
251
+ natsC , endpoint := setupNSQContainer (ctx , t )
252
+ defer testcontainers .CleanupContainer (t , natsC )
237
253
m := mockMessage {
238
254
Message : "foo" ,
239
255
}
240
256
w := NewWorker (
241
- WithAddr (host + ":4150" ),
257
+ WithAddr (endpoint ),
242
258
WithTopic ("GoroutineLeak" ),
243
259
WithLogger (queue .NewEmptyLogger ()),
244
260
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
@@ -279,11 +295,14 @@ func TestGoroutineLeak(t *testing.T) {
279
295
}
280
296
281
297
func TestGoroutinePanic (t * testing.T ) {
298
+ ctx := context .Background ()
299
+ natsC , endpoint := setupNSQContainer (ctx , t )
300
+ defer testcontainers .CleanupContainer (t , natsC )
282
301
m := mockMessage {
283
302
Message : "foo" ,
284
303
}
285
304
w := NewWorker (
286
- WithAddr (host + ":4150" ),
305
+ WithAddr (endpoint ),
287
306
WithTopic ("GoroutinePanic" ),
288
307
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
289
308
panic ("missing something" )
@@ -305,11 +324,14 @@ func TestGoroutinePanic(t *testing.T) {
305
324
}
306
325
307
326
func TestNSQStatsinQueue (t * testing.T ) {
327
+ ctx := context .Background ()
328
+ natsC , endpoint := setupNSQContainer (ctx , t )
329
+ defer testcontainers .CleanupContainer (t , natsC )
308
330
m := mockMessage {
309
331
Message : "foo" ,
310
332
}
311
333
w := NewWorker (
312
- WithAddr (host + ":4150" ),
334
+ WithAddr (endpoint ),
313
335
WithTopic ("nsq_stats" ),
314
336
WithRunFunc (func (ctx context.Context , m core.TaskMessage ) error {
315
337
log .Println ("get message" )
@@ -334,11 +356,14 @@ func TestNSQStatsinQueue(t *testing.T) {
334
356
}
335
357
336
358
func TestNSQStatsInWorker (t * testing.T ) {
359
+ ctx := context .Background ()
360
+ natsC , endpoint := setupNSQContainer (ctx , t )
361
+ defer testcontainers .CleanupContainer (t , natsC )
337
362
m := mockMessage {
338
363
Message : "foo" ,
339
364
}
340
365
w := NewWorker (
341
- WithAddr (host + ":4150" ),
366
+ WithAddr (endpoint ),
342
367
WithTopic ("nsq_stats_queue" ),
343
368
)
344
369
0 commit comments