Skip to content

Commit c6204e8

Browse files
author
Guy Baron
authored
Merge pull request #62 from wework/v1.x
Merge v1.x -> master
2 parents df00482 + d0bb819 commit c6204e8

File tree

3 files changed

+17
-12
lines changed

3 files changed

+17
-12
lines changed

gbus/bus.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,18 +323,20 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
323323
b.log().WithError(err).Error("could not stop worker")
324324
}
325325
}
326-
327326
b.Outgoing.shutdown()
328327
b.started = false
329-
330328
if b.IsTxnl {
329+
331330
err := b.Outbox.Stop()
331+
332332
if err != nil {
333333
b.log().WithError(err).Error("could not shutdown outbox")
334334
return err
335335
}
336-
b.TxProvider.Dispose()
336+
b.TxProvider.Dispose()
337+
337338
}
339+
338340
return nil
339341
}
340342

@@ -639,7 +641,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
639641
//send to the transactional outbox if the bus is transactional
640642
//otherwise send directly to amqp
641643
if b.IsTxnl && tx != nil {
642-
b.log().WithField("message_id", msg.MessageId).Info("sending message to outbox")
644+
b.log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
643645
saveErr := b.Outbox.Save(tx, exchange, key, msg)
644646
if saveErr != nil {
645647
log.WithError(saveErr).Error("failed to save to transactional outbox")

gbus/tx/mysql/txoutbox.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error {
8888

8989
//Stop forcess the transactional outbox to stop processing additional messages
9090
func (outbox *TxOutbox) Stop() error {
91-
outbox.exit <- true
91+
close(outbox.exit)
9292
return nil
9393
}
9494

@@ -189,7 +189,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
189189
outbox.log().WithError(txErr).WithField("delivery_tag", deliveryTag).Error("failed to create transaction for updating acked delivery tag")
190190
return txErr
191191
}
192-
outbox.log().WithField("delivery_tag", deliveryTag).Info("ack received for delivery tag")
192+
outbox.log().WithField("delivery_tag", deliveryTag).Debug("ack received for delivery tag")
193193

194194
outbox.gl.Lock()
195195
recID := outbox.recordsPendingConfirms[deliveryTag]
@@ -268,24 +268,26 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
268268
outbox.log().WithError(decErr).Error("failed to decode amqp message from outbox record")
269269
continue
270270
}
271-
outbox.log().WithField("message_id", publishing.MessageId).Info("relay message")
272271

273272
//send the amqp message to rabbitmq
274273
if deliveryTag, postErr := outbox.amqpOutbox.Post(exchange, routingKey, publishing); postErr != nil {
275-
276274
outbox.log().WithError(postErr).
277275
WithFields(log.Fields{"message_name": publishing.Headers["x-msg-name"], "message_id": publishing.MessageId}).
278276
Error("failed to send amqp message")
279277
failedDeliveries = append(failedDeliveries, recID)
280278
} else {
279+
outbox.log().WithFields(log.Fields{"message_id": publishing.MessageId, "delivery_tag": deliveryTag}).Debug("relay message")
281280
successfulDeliveries[deliveryTag] = recID
282281
}
282+
283283
}
284284
err := rows.Close()
285285
if err != nil {
286286
outbox.log().WithError(err).Error("could not close Rows")
287287
}
288-
288+
if messagesSent := len(successfulDeliveries); messagesSent > 0 {
289+
outbox.log().WithField("messages_sent", len(successfulDeliveries)).Info("outbox relayed messages")
290+
}
289291
for deliveryTag, id := range successfulDeliveries {
290292
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status=1, delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
291293
if updateErr != nil {

gbus/worker.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,28 +179,29 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery
179179
} else {
180180
worker.handlersLock.Lock()
181181
defer worker.handlersLock.Unlock()
182-
worker.log().WithFields(log.Fields{"number_of_handlers": len(worker.registrations)}).Info("found message handlers")
182+
183183
for _, registration := range worker.registrations {
184184
if registration.Matches(delivery.Exchange, delivery.RoutingKey, bm.PayloadFQN) {
185185
handlers = append(handlers, registration.Handler)
186186
}
187187
}
188188
}
189189

190+
worker.log().WithFields(log.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers")
190191
return handlers
191192
}
192193

193194
func (worker *worker) ack(delivery amqp.Delivery) error {
194195
ack := func(attempts uint) error { return delivery.Ack(false /*multiple*/) }
195-
worker.log().WithField("message_id", delivery.MessageId).Info("acking message")
196+
worker.log().WithField("message_id", delivery.MessageId).Debug("acking message")
196197
err := retry.Retry(ack,
197198
strategy.Wait(100*time.Millisecond))
198199

199200
if err != nil {
200201
worker.log().WithError(err).Error("could not ack the message")
201202
worker.span.LogFields(slog.Error(err))
202203
} else {
203-
worker.log().WithField("message_id", delivery.MessageId).Info("message acked")
204+
worker.log().WithField("message_id", delivery.MessageId).Debug("message acked")
204205
}
205206

206207
return err

0 commit comments

Comments
 (0)