@@ -3,8 +3,10 @@ package tests
3
3
import (
4
4
"context"
5
5
"database/sql"
6
+ "errors"
6
7
"fmt"
7
8
"reflect"
9
+ "sync"
8
10
"testing"
9
11
"time"
10
12
@@ -209,28 +211,34 @@ func TestRPC(t *testing.T) {
209
211
}
210
212
211
213
func TestDeadlettering (t * testing.T ) {
214
+
215
+ var waitgroup sync.WaitGroup
216
+ waitgroup .Add (2 )
212
217
poision := gbus .NewBusMessage (PoisionMessage {})
213
218
service1 := createBusWithOptions (testSvc1 , "grabbit-dead" , true , true )
214
219
deadletterSvc := createBusWithOptions ("deadletterSvc" , "grabbit-dead" , true , true )
215
- proceed := make ( chan bool )
216
- handler := func (tx * sql.Tx , poision amqp.Delivery ) error {
217
- proceed <- true
220
+
221
+ deadMessageHandler := func (tx * sql.Tx , poision amqp.Delivery ) error {
222
+ waitgroup . Done ()
218
223
return nil
219
224
}
220
225
221
- deadletterSvc .HandleDeadletter (handler )
226
+ faultyHandler := func (invocation gbus.Invocation , message * gbus.BusMessage ) error {
227
+ return errors .New ("fail" )
228
+ }
229
+
230
+ deadletterSvc .HandleDeadletter (deadMessageHandler )
231
+ service1 .HandleMessage (Command1 {}, faultyHandler )
222
232
223
233
deadletterSvc .Start ()
224
234
defer deadletterSvc .Shutdown ()
225
235
service1 .Start ()
226
236
defer service1 .Shutdown ()
227
237
228
- e := service1 .Send (context .Background (), testSvc1 , poision )
229
- if e != nil {
230
- log .Printf ("send error: %v" , e )
231
- }
238
+ service1 .Send (context .Background (), testSvc1 , poision )
239
+ service1 .Send (context .Background (), testSvc1 , gbus .NewBusMessage (Command1 {}))
232
240
233
- <- proceed
241
+ waitgroup . Wait ()
234
242
}
235
243
236
244
func TestRegistrationAfterBusStarts (t * testing.T ) {
0 commit comments