Skip to content

Commit df00482

Browse files
author
Guy Baron
authored
Merge pull request #27 from wework/cleaner-code
Cleaner code
2 parents 01f5150 + 4972c6a commit df00482

File tree

9 files changed

+51
-45
lines changed

9 files changed

+51
-45
lines changed

gbus/abstractions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import (
88
"github.com/streadway/amqp"
99
)
1010

11+
type Semantics string
12+
13+
const (
14+
CMD Semantics = "cmd"
15+
EVT Semantics = "evt"
16+
)
17+
1118
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
1219
type Bus interface {
1320
HandlerRegister

gbus/bus.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ var (
6666

6767
//MaxRetryCount defines the max times a retry can run
6868
MaxRetryCount uint = 3
69-
rpcHeaderName = "x-grabbit-msg-rpc-id"
69+
//RpcHeaderName used to define the header in grabbit for RPC
70+
RpcHeaderName = "x-grabbit-msg-rpc-id"
7071
)
7172

7273
func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
@@ -149,13 +150,13 @@ func (b *DefaultBus) bindServiceQueue() error {
149150
if e != nil {
150151
b.log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange")
151152
return e
152-
} else {
153-
e = b.bindQueue(topic, exchange)
154-
if e != nil {
155-
b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange")
156-
return e
157-
}
158153
}
154+
e = b.bindQueue(topic, exchange)
155+
if e != nil {
156+
b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange")
157+
return e
158+
}
159+
159160
}
160161
return nil
161162
}
@@ -173,7 +174,7 @@ func (b *DefaultBus) Start() error {
173174

174175
var e error
175176
//create amqo connection and channel
176-
if b.amqpConn, e = b.connect(int(MaxRetryCount)); e != nil {
177+
if b.amqpConn, e = b.connect(MaxRetryCount); e != nil {
177178
return e
178179
}
179180

@@ -436,7 +437,7 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu
436437
b.RPCHandlers[rpcID] = handler
437438
//we do not defer this as we do not want b.RPCHandlers to be locked until a reply returns
438439
b.RPCLock.Unlock()
439-
request.Semantics = "cmd"
440+
request.Semantics = CMD
440441
rpc := rpcPolicy{
441442
rpcID: rpcID}
442443

@@ -467,7 +468,7 @@ func (b *DefaultBus) publishWithTx(ctx context.Context, ambientTx *sql.Tx, excha
467468
if !b.started {
468469
return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages")
469470
}
470-
message.Semantics = "evt"
471+
message.Semantics = EVT
471472
publish := func(tx *sql.Tx) error {
472473
return b.sendImpl(ctx, tx, "", b.SvcName, exchange, topic, message, policies...)
473474
}
@@ -478,7 +479,7 @@ func (b *DefaultBus) sendWithTx(ctx context.Context, ambientTx *sql.Tx, toServic
478479
if !b.started {
479480
return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages")
480481
}
481-
message.Semantics = "cmd"
482+
message.Semantics = CMD
482483
send := func(tx *sql.Tx) error {
483484
return b.sendImpl(ctx, tx, toService, b.SvcName, "", "", message, policies...)
484485
}
@@ -543,20 +544,15 @@ func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error {
543544

544545
}
545546

546-
func (b *DefaultBus) connect(retryCount int) (*amqp.Connection, error) {
547+
func (b *DefaultBus) connect(retryCount uint) (*amqp.Connection, error) {
548+
var conn *amqp.Connection
549+
err := b.SafeWithRetries(func() error {
550+
var err error
551+
conn, err = amqp.Dial(b.AmqpConnStr)
552+
return err
553+
}, retryCount)
554+
return conn, err
547555

548-
connected := false
549-
attempts := uint(0)
550-
var lastErr error
551-
for !connected && attempts < MaxRetryCount {
552-
conn, e := amqp.Dial(b.AmqpConnStr)
553-
if e == nil {
554-
return conn, e
555-
}
556-
lastErr = e
557-
attempts++
558-
}
559-
return nil, lastErr
560556
}
561557

562558
//
@@ -633,7 +629,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
633629

634630
key := ""
635631

636-
if message.Semantics == "cmd" {
632+
if message.Semantics == CMD {
637633
key = toService
638634
} else {
639635
key = topic
@@ -695,5 +691,5 @@ type rpcPolicy struct {
695691
}
696692

697693
func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
698-
publishing.Headers[rpcHeaderName] = p.rpcID
694+
publishing.Headers[RpcHeaderName] = p.rpcID
699695
}

gbus/messages.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type BusMessage struct {
1212
CorrelationID string
1313
SagaID string
1414
SagaCorrelationID string
15-
Semantics string /*cmd or evt*/
15+
Semantics Semantics /*cmd or evt*/
1616
Payload Message
1717
PayloadFQN string
1818
RPCID string
@@ -68,7 +68,7 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
6868
log.String("SagaID", bm.SagaID),
6969
log.String("CorrelationID", bm.CorrelationID),
7070
log.String("SagaCorrelationID", bm.SagaCorrelationID),
71-
log.String("Semantics", bm.Semantics),
71+
log.String("Semantics", string(bm.Semantics)),
7272
log.String("RPCID", bm.RPCID),
7373
}
7474
}

gbus/saga/glue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
163163

164164
return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
165165

166-
} else if message.Semantics == "cmd" {
166+
} else if message.Semantics == gbus.CMD {
167167
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
168168
return e
169169
} else {
@@ -207,7 +207,7 @@ func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessa
207207
}
208208

209209
func (imsm *Glue) registerMessage(message gbus.Message) error {
210-
//only register once on each message so we will not duplicate invokations
210+
//only register once on each message so we will not duplicate invocations
211211
if _, exists := imsm.alreadyRegistred[message.SchemaName()]; exists {
212212
return nil
213213
}

gbus/saga/invocation.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,20 @@ func (si *sagaInvocation) Ctx() context.Context {
4949
return si.ctx
5050
}
5151

52-
func (si *sagaInvocation) Send(ctx context.Context, toService string, command *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
52+
func (si *sagaInvocation) Send(ctx context.Context, toService string,
53+
command *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
5354
si.setCorrelationIDs(command, false)
5455
return si.decoratedBus.Send(ctx, toService, command, policies...)
5556
}
5657

57-
func (si *sagaInvocation) Publish(ctx context.Context, exchange, topic string, event *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
58+
func (si *sagaInvocation) Publish(ctx context.Context, exchange, topic string,
59+
event *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
5860
si.setCorrelationIDs(event, true)
5961
return si.decoratedBus.Publish(ctx, exchange, topic, event, policies...)
6062
}
6163

62-
func (si *sagaInvocation) RPC(ctx context.Context, service string, request, reply *gbus.BusMessage, timeout time.Duration) (*gbus.BusMessage, error) {
64+
func (si *sagaInvocation) RPC(ctx context.Context, service string, request,
65+
reply *gbus.BusMessage, timeout time.Duration) (*gbus.BusMessage, error) {
6366
return si.decoratedBus.RPC(ctx, service, request, reply, timeout)
6467
}
6568

gbus/serialization/avro.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (as *Avro) RegisterAvroMessageFromFile(schemaName, schemaPath, namespace st
167167
}
168168

169169
//RegisterAvroMessage registers a schema to a topic and binds it to an object (obj)
170-
func (as *Avro) RegisterAvroMessage(schemaName, namespace, schema string, obj AvroMessageGenerated, deserializer avroDeserializer) (err error) {
170+
func (as *Avro) RegisterAvroMessage(schemaName, namespace, schema string, obj gbus.Message, deserializer avroDeserializer) (err error) {
171171
as.lock.Lock()
172172
defer as.lock.Unlock()
173173
if _, ok := as.registeredSchemas[obj.SchemaName()]; !ok {

gbus/tx/mysql/txoutbox.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (outbox *TxOutbox) Save(tx *sql.Tx, exchange, routingKey string, amqpMessag
119119

120120
func (outbox *TxOutbox) purge(tx *sql.Tx) error {
121121

122-
purgeSQL := `DELETE FROM ` + getOutboxName(outbox.svcName)
122+
purgeSQL := fmt.Sprintf("DELETE FROM %s", getOutboxName(outbox.svcName))
123123
_, err := tx.Exec(purgeSQL)
124124
return err
125125
}
@@ -224,8 +224,8 @@ func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) {
224224
}
225225

226226
func (outbox *TxOutbox) scavengeOrphanedRecords(tx *sql.Tx) (*sql.Rows, error) {
227-
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 1 ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
228-
return tx.Query(selectSQL)
227+
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 1 ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
228+
return tx.Query(selectSQL, strconv.Itoa(maxPageSize))
229229
}
230230

231231
func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, error)) error {
@@ -322,7 +322,7 @@ func (outbox *TxOutbox) ensureSchema(tx *sql.Tx, svcName string) error {
322322

323323
if schemaExists {
324324
/*
325-
The follwoing performs an alter schema to accommodate for breaking change introduced in commit 6a9f5df
325+
The following performs an alter schema to accommodate for breaking change introduced in commit 6a9f5df
326326
so that earlier consumers of grabbit will not break once the upgrade to the 1.0.0 release.
327327
Once a proper DB migration stratagy will be in place and implemented (post 1.0.0) the following code
328328
will be deleted.

gbus/tx/sagastore.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (store *SagaStore) RegisterSagaType(saga gbus.Saga) {
115115
//DeleteSaga implements interface method store.DeleteSaga
116116
func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error {
117117
tblName := store.GetSagatableName()
118-
deleteSQL := `DELETE FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``
118+
deleteSQL := `DELETE FROM ` + tblName + ` WHERE saga_id= ?`
119119
_, err := tx.Exec(deleteSQL, instance.ID)
120120
return err
121121
}
@@ -154,8 +154,7 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance,
154154
func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) (err error) {
155155
store.RegisterSagaType(newInstance.UnderlyingInstance)
156156
tblName := store.GetSagatableName()
157-
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, version)
158-
VALUES (` + store.ParamsMarkers[0] + `, ` + store.ParamsMarkers[1] + `, ` + store.ParamsMarkers[2] + `, ` + store.ParamsMarkers[3] + `)`
157+
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, version) VALUES (?, ?, ?, ?)`
159158

160159
var buf []byte
161160
if buf, err = store.serilizeSaga(newInstance); err != nil {
@@ -174,7 +173,8 @@ func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstan
174173
func (store *SagaStore) Purge() error {
175174
tx := store.NewTx()
176175
store.log().WithField("saga_table", store.GetSagatableName()).Info("Purging saga table")
177-
results, err := tx.Exec("DELETE FROM " + store.GetSagatableName())
176+
deleteSQL := fmt.Sprintf("DELETE FROM %s", store.GetSagatableName())
177+
results, err := tx.Exec(deleteSQL)
178178
if err != nil {
179179
store.log().WithError(err).Error("failed to purge saga table")
180180
return err

gbus/worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er
137137
bm.ID = delivery.MessageId
138138
bm.CorrelationID = delivery.CorrelationId
139139
if delivery.Exchange != "" {
140-
bm.Semantics = "evt"
140+
bm.Semantics = EVT
141141
} else {
142-
bm.Semantics = "cmd"
142+
bm.Semantics = CMD
143143
}
144144
if bm.PayloadFQN == "" || bm.Semantics == "" {
145145
//TODO: Log poision pill message
@@ -160,7 +160,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er
160160
func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery amqp.Delivery) []MessageHandler {
161161
handlers := make([]MessageHandler, 0)
162162
if isRPCreply {
163-
rpcID, rpcHeaderFound := delivery.Headers[rpcHeaderName].(string)
163+
rpcID, rpcHeaderFound := delivery.Headers[RpcHeaderName].(string)
164164
if !rpcHeaderFound {
165165
worker.log().Warn("rpc message received but no rpc header found...rejecting message")
166166
return handlers

0 commit comments

Comments
 (0)