Skip to content

Commit 5cf1e58

Browse files
author
adiweiss
authored
Merge pull request #97 from wework/separate_amqp_connection_between_ingress_and_egress
separate connection for egress
2 parents cfd7858 + 7b82531 commit 5cf1e58

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

gbus/bus.go

+24-18
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ type DefaultBus struct {
2828
Outbox TxOutbox
2929
PrefetchCount uint
3030
AmqpConnStr string
31-
amqpConn *amqp.Connection
31+
ingressConn *amqp.Connection
32+
egressConn *amqp.Connection
3233
workers []*worker
33-
AMQPChannel *amqp.Channel
34-
outAMQPChannel *amqp.Channel
34+
ingressChannel *amqp.Channel
35+
egressChannel *amqp.Channel
3536
serviceQueue amqp.Queue
3637
rpcQueue amqp.Queue
3738
SvcName string
@@ -83,7 +84,7 @@ func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
8384
*/
8485
uid := xid.New().String()
8586
qName := b.SvcName + "_rpc_" + uid
86-
q, e := b.AMQPChannel.QueueDeclare(qName,
87+
q, e := b.ingressChannel.QueueDeclare(qName,
8788
false, /*durable*/
8889
true, /*autoDelete*/
8990
false, /*exclusive*/
@@ -97,7 +98,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
9798
var q amqp.Queue
9899

99100
if b.PurgeOnStartup {
100-
msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
101+
msgsPurged, purgeError := b.ingressChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
101102
if purgeError != nil {
102103
b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue")
103104
return q, purgeError
@@ -108,7 +109,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
108109
if b.DLX != "" {
109110
args["x-dead-letter-exchange"] = b.DLX
110111
}
111-
q, e := b.AMQPChannel.QueueDeclare(qName,
112+
q, e := b.ingressChannel.QueueDeclare(qName,
112113
true, /*durable*/
113114
false, /*autoDelete*/
114115
false, /*exclusive*/
@@ -124,7 +125,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
124125
func (b *DefaultBus) bindServiceQueue() error {
125126

126127
if b.deadletterHandler != nil && b.DLX != "" {
127-
err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
128+
err := b.ingressChannel.ExchangeDeclare(b.DLX, /*name*/
128129
"fanout", /*kind*/
129130
true, /*durable*/
130131
false, /*autoDelete*/
@@ -144,7 +145,7 @@ func (b *DefaultBus) bindServiceQueue() error {
144145
for _, subscription := range b.DelayedSubscriptions {
145146
topic := subscription[0]
146147
exchange := subscription[1]
147-
e := b.AMQPChannel.ExchangeDeclare(exchange, /*name*/
148+
e := b.ingressChannel.ExchangeDeclare(exchange, /*name*/
148149
"topic", /*kind*/
149150
true, /*durable*/
150151
false, /*autoDelete*/
@@ -178,27 +179,32 @@ func (b *DefaultBus) Start() error {
178179

179180
var e error
180181
//create amqo connection and channel
181-
if b.amqpConn, e = b.connect(MaxRetryCount); e != nil {
182+
if b.ingressConn, e = b.connect(MaxRetryCount); e != nil {
183+
return e
184+
}
185+
if b.egressConn, e = b.connect(MaxRetryCount); e != nil {
182186
return e
183187
}
184188

185-
if b.AMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
189+
if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
186190
return e
187191
}
188-
if b.outAMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
192+
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
189193
return e
190194
}
191195

192196
//register on failure notifications
193197
b.amqpErrors = make(chan *amqp.Error)
194198
b.amqpBlocks = make(chan amqp.Blocking)
195-
b.amqpConn.NotifyClose(b.amqpErrors)
196-
b.amqpConn.NotifyBlocked(b.amqpBlocks)
197-
b.outAMQPChannel.NotifyClose(b.amqpErrors)
199+
b.ingressConn.NotifyClose(b.amqpErrors)
200+
b.ingressConn.NotifyBlocked(b.amqpBlocks)
201+
b.egressConn.NotifyClose(b.amqpErrors)
202+
b.egressConn.NotifyBlocked(b.amqpBlocks)
203+
b.egressChannel.NotifyClose(b.amqpErrors)
198204
//TODO:Figure out what should be done
199205

200206
//init the outbox that sends the messages to the amqp transport and handles publisher confirms
201-
if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
207+
if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil {
202208
return e
203209
}
204210
/*
@@ -208,7 +214,7 @@ func (b *DefaultBus) Start() error {
208214
if b.IsTxnl {
209215

210216
var amqpChan *amqp.Channel
211-
if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil {
217+
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
212218
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
213219
return e
214220
}
@@ -269,7 +275,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
269275
workers := make([]*worker, 0)
270276
for i := uint(0); i < workerNum; i++ {
271277
//create a channel per worker as we can't share channels across go routines
272-
amqpChan, createChanErr := b.createAMQPChannel(b.amqpConn)
278+
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
273279
if createChanErr != nil {
274280
return nil, createChanErr
275281
}
@@ -681,7 +687,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
681687
}
682688

683689
func (b *DefaultBus) bindQueue(topic, exchange string) error {
684-
return b.AMQPChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
690+
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
685691
}
686692

687693
type rpcPolicy struct {

0 commit comments

Comments
 (0)