Skip to content

Commit 7b82531

Browse files
author
adi.yaskolka
committed
refactoring
1 parent e8cc1da commit 7b82531

File tree

1 file changed

+22
-22
lines changed

1 file changed

+22
-22
lines changed

Diff for: gbus/bus.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ type DefaultBus struct {
2828
Outbox TxOutbox
2929
PrefetchCount uint
3030
AmqpConnStr string
31-
amqpConn *amqp.Connection
32-
outAmqpConn *amqp.Connection
31+
ingressConn *amqp.Connection
32+
egressConn *amqp.Connection
3333
workers []*worker
34-
AMQPChannel *amqp.Channel
35-
outAMQPChannel *amqp.Channel
34+
ingressChannel *amqp.Channel
35+
egressChannel *amqp.Channel
3636
serviceQueue amqp.Queue
3737
rpcQueue amqp.Queue
3838
SvcName string
@@ -84,7 +84,7 @@ func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
8484
*/
8585
uid := xid.New().String()
8686
qName := b.SvcName + "_rpc_" + uid
87-
q, e := b.AMQPChannel.QueueDeclare(qName,
87+
q, e := b.ingressChannel.QueueDeclare(qName,
8888
false, /*durable*/
8989
true, /*autoDelete*/
9090
false, /*exclusive*/
@@ -98,7 +98,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
9898
var q amqp.Queue
9999

100100
if b.PurgeOnStartup {
101-
msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
101+
msgsPurged, purgeError := b.ingressChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
102102
if purgeError != nil {
103103
b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue")
104104
return q, purgeError
@@ -109,7 +109,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
109109
if b.DLX != "" {
110110
args["x-dead-letter-exchange"] = b.DLX
111111
}
112-
q, e := b.AMQPChannel.QueueDeclare(qName,
112+
q, e := b.ingressChannel.QueueDeclare(qName,
113113
true, /*durable*/
114114
false, /*autoDelete*/
115115
false, /*exclusive*/
@@ -125,7 +125,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
125125
func (b *DefaultBus) bindServiceQueue() error {
126126

127127
if b.deadletterHandler != nil && b.DLX != "" {
128-
err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
128+
err := b.ingressChannel.ExchangeDeclare(b.DLX, /*name*/
129129
"fanout", /*kind*/
130130
true, /*durable*/
131131
false, /*autoDelete*/
@@ -145,7 +145,7 @@ func (b *DefaultBus) bindServiceQueue() error {
145145
for _, subscription := range b.DelayedSubscriptions {
146146
topic := subscription[0]
147147
exchange := subscription[1]
148-
e := b.AMQPChannel.ExchangeDeclare(exchange, /*name*/
148+
e := b.ingressChannel.ExchangeDeclare(exchange, /*name*/
149149
"topic", /*kind*/
150150
true, /*durable*/
151151
false, /*autoDelete*/
@@ -179,32 +179,32 @@ func (b *DefaultBus) Start() error {
179179

180180
var e error
181181
//create amqo connection and channel
182-
if b.amqpConn, e = b.connect(MaxRetryCount); e != nil {
182+
if b.ingressConn, e = b.connect(MaxRetryCount); e != nil {
183183
return e
184184
}
185-
if b.outAmqpConn, e = b.connect(MaxRetryCount); e != nil {
185+
if b.egressConn, e = b.connect(MaxRetryCount); e != nil {
186186
return e
187187
}
188188

189-
if b.AMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
189+
if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
190190
return e
191191
}
192-
if b.outAMQPChannel, e = b.createAMQPChannel(b.outAmqpConn); e != nil {
192+
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
193193
return e
194194
}
195195

196196
//register on failure notifications
197197
b.amqpErrors = make(chan *amqp.Error)
198198
b.amqpBlocks = make(chan amqp.Blocking)
199-
b.amqpConn.NotifyClose(b.amqpErrors)
200-
b.amqpConn.NotifyBlocked(b.amqpBlocks)
201-
b.outAmqpConn.NotifyClose(b.amqpErrors)
202-
b.outAmqpConn.NotifyBlocked(b.amqpBlocks)
203-
b.outAMQPChannel.NotifyClose(b.amqpErrors)
199+
b.ingressConn.NotifyClose(b.amqpErrors)
200+
b.ingressConn.NotifyBlocked(b.amqpBlocks)
201+
b.egressConn.NotifyClose(b.amqpErrors)
202+
b.egressConn.NotifyBlocked(b.amqpBlocks)
203+
b.egressChannel.NotifyClose(b.amqpErrors)
204204
//TODO:Figure out what should be done
205205

206206
//init the outbox that sends the messages to the amqp transport and handles publisher confirms
207-
if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
207+
if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil {
208208
return e
209209
}
210210
/*
@@ -214,7 +214,7 @@ func (b *DefaultBus) Start() error {
214214
if b.IsTxnl {
215215

216216
var amqpChan *amqp.Channel
217-
if amqpChan, e = b.createAMQPChannel(b.outAmqpConn); e != nil {
217+
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
218218
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
219219
return e
220220
}
@@ -275,7 +275,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
275275
workers := make([]*worker, 0)
276276
for i := uint(0); i < workerNum; i++ {
277277
//create a channel per worker as we can't share channels across go routines
278-
amqpChan, createChanErr := b.createAMQPChannel(b.amqpConn)
278+
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
279279
if createChanErr != nil {
280280
return nil, createChanErr
281281
}
@@ -687,7 +687,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
687687
}
688688

689689
func (b *DefaultBus) bindQueue(topic, exchange string) error {
690-
return b.AMQPChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
690+
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
691691
}
692692

693693
type rpcPolicy struct {

0 commit comments

Comments
 (0)