Skip to content

Commit 253ae5d

Browse files
author
Guy Baron
authored
V1.x master merge conflicts (#128)
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120) * remove fields * remove fields * go fmt and go lint error fixes to improve goreportcard (#126) * go fmt on some files * go fmt * added comments on exported types * cunsume the messages channel via ranging over the channel to prevent (#125) empty delivreies * Migrations functionality (#111) * implement migrations * implement migrations * implement migrations * implement migrations * implement migrations * migrations * migrations * migrations * migrations * migrations * migrations * migrations * fix tests error * add migrations * migrations - timeout table migration * test - resend dead to queue - fixes after cr * migraration to grabbit (use forked migrator) * remove fields * remove fields * remove fields * remove fields * touch
1 parent 31d9fc5 commit 253ae5d

20 files changed

+191
-287
lines changed

gbus/builder/builder.go

+3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
7777
panic(err)
7878
}
7979
gb.TxProvider = mysqltx
80+
81+
mysql.EnsureSchema(mysqltx.Database, gb.SvcName)
82+
8083
//TODO move purge logic into the NewSagaStore factory method
8184
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
8285
if builder.purgeOnStartup {

gbus/bus.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,6 @@ func (b *DefaultBus) bindServiceQueue() error {
168168
return nil
169169
}
170170

171-
func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) {
172-
channel, e := conn.Channel()
173-
if e != nil {
174-
return nil, e
175-
}
176-
return channel, nil
177-
}
178-
179171
//Start implements GBus.Start()
180172
func (b *DefaultBus) Start() error {
181173

@@ -188,10 +180,10 @@ func (b *DefaultBus) Start() error {
188180
return e
189181
}
190182

191-
if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
183+
if b.ingressChannel, e = b.ingressConn.Channel(); e != nil {
192184
return e
193185
}
194-
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
186+
if b.egressChannel, e = b.egressConn.Channel(); e != nil {
195187
return e
196188
}
197189

@@ -209,7 +201,7 @@ func (b *DefaultBus) Start() error {
209201
TODO://the design is crap and needs to be refactored
210202
*/
211203
var amqpChan *amqp.Channel
212-
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
204+
if amqpChan, e = b.egressConn.Channel(); e != nil {
213205
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
214206
return e
215207
}
@@ -272,7 +264,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
272264
workers := make([]*worker, 0)
273265
for i := uint(0); i < workerNum; i++ {
274266
//create a channel per worker as we can't share channels across go routines
275-
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
267+
amqpChan, createChanErr := b.ingressConn.Channel()
276268
if createChanErr != nil {
277269
return nil, createChanErr
278270
}

gbus/invocation.go

+9
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
3232
return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID})
3333
}
3434

35+
//Reply implements the Invocation.Reply signature
3536
func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error {
3637
if dfi.inboundMsg != nil {
3738
replyMessage.CorrelationID = dfi.inboundMsg.ID
@@ -51,13 +52,15 @@ func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *Bu
5152
return err
5253
}
5354

55+
//Send implements the Invocation.Send signature
5456
func (dfi *defaultInvocationContext) Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error {
5557
if dfi.tx != nil {
5658
return dfi.bus.sendWithTx(ctx, dfi.tx, toService, command, policies...)
5759
}
5860
return dfi.bus.Send(ctx, toService, command, policies...)
5961
}
6062

63+
//Publish implements the Invocation.Publish signature
6164
func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error {
6265

6366
if dfi.tx != nil {
@@ -66,26 +69,32 @@ func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topi
6669
return dfi.bus.Publish(ctx, exchange, topic, event, policies...)
6770
}
6871

72+
//RPC implements the Invocation.RPC signature
6973
func (dfi *defaultInvocationContext) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error) {
7074
return dfi.bus.RPC(ctx, service, request, reply, timeout)
7175
}
7276

77+
//Bus implements the Invocation.Bus signature
7378
func (dfi *defaultInvocationContext) Bus() Messaging {
7479
return dfi
7580
}
7681

82+
//Tx implements the Invocation.Tx signature
7783
func (dfi *defaultInvocationContext) Tx() *sql.Tx {
7884
return dfi.tx
7985
}
8086

87+
//Ctx implements the Invocation.Ctx signature
8188
func (dfi *defaultInvocationContext) Ctx() context.Context {
8289
return dfi.ctx
8390
}
8491

92+
//Routing implements the Invocation.Routing signature
8593
func (dfi *defaultInvocationContext) Routing() (exchange, routingKey string) {
8694
return dfi.exchange, dfi.routingKey
8795
}
8896

97+
//DeliveryInfo implements the Invocation.DeliveryInfo signature
8998
func (dfi *defaultInvocationContext) DeliveryInfo() DeliveryInfo {
9099
return dfi.deliveryInfo
91100
}

gbus/message_handler.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
//MessageHandler signature for all command handlers
1010
type MessageHandler func(invocation Invocation, message *BusMessage) error
1111

12+
//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
1213
func (mg MessageHandler) Name() string {
1314
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
1415
splits := strings.Split(funName, ".")

gbus/metrics/handler_metrics.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package metrics
22

33
import (
44
"fmt"
5+
"sync"
6+
57
"github.com/prometheus/client_golang/prometheus"
6-
"github.com/prometheus/client_model/go"
8+
io_prometheus_client "github.com/prometheus/client_model/go"
79
"github.com/sirupsen/logrus"
8-
"sync"
910
)
1011

1112
var (
@@ -20,7 +21,7 @@ const (
2021
grabbitPrefix = "grabbit"
2122
)
2223

23-
type HandlerMetrics struct {
24+
type handlerMetrics struct {
2425
result *prometheus.CounterVec
2526
latency prometheus.Summary
2627
}
@@ -62,17 +63,17 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger
6263
return err
6364
}
6465

65-
func GetHandlerMetrics(handlerName string) *HandlerMetrics {
66+
func GetHandlerMetrics(handlerName string) *handlerMetrics {
6667
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
6768
if ok {
68-
return entry.(*HandlerMetrics)
69+
return entry.(*handlerMetrics)
6970
}
7071

7172
return nil
7273
}
7374

74-
func newHandlerMetrics(handlerName string) *HandlerMetrics {
75-
return &HandlerMetrics{
75+
func newHandlerMetrics(handlerName string) *handlerMetrics {
76+
return &handlerMetrics{
7677
result: prometheus.NewCounterVec(
7778
prometheus.CounterOpts{
7879
Namespace: grabbitPrefix,
@@ -98,15 +99,15 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error
9899
return functionToTrack()
99100
}
100101

101-
func (hm *HandlerMetrics) GetSuccessCount() (float64, error) {
102+
func (hm *handlerMetrics) GetSuccessCount() (float64, error) {
102103
return hm.getLabeledCounterValue(success)
103104
}
104105

105-
func (hm *HandlerMetrics) GetFailureCount() (float64, error) {
106+
func (hm *handlerMetrics) GetFailureCount() (float64, error) {
106107
return hm.getLabeledCounterValue(failure)
107108
}
108109

109-
func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
110+
func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) {
110111
m := &io_prometheus_client.Metric{}
111112
err := hm.latency.Write(m)
112113
if err != nil {
@@ -116,7 +117,7 @@ func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
116117
return m.GetSummary().SampleCount, nil
117118
}
118119

119-
func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) {
120+
func (hm *handlerMetrics) getLabeledCounterValue(label string) (float64, error) {
120121
m := &io_prometheus_client.Metric{}
121122
err := hm.result.WithLabelValues(label).Write(m)
122123

gbus/metrics/message_metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ func newRejectedMessagesCounter() prometheus.Counter {
3232
Name: "rejected_messages",
3333
Help: "counting the rejected messages",
3434
})
35-
}
35+
}

gbus/metrics/saga_metrics.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
io_prometheus_client "github.com/prometheus/client_model/go"
77
)
88

9+
//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
910
var SagaTimeoutCounter = newSagaTimeoutCounter()
1011

12+
//GetSagaTimeoutCounterValue gets the counter value of timed out sagas reported to prometheus
1113
func GetSagaTimeoutCounterValue() (float64, error) {
1214
m := &io_prometheus_client.Metric{}
1315
err := SagaTimeoutCounter.Write(m)

gbus/outbox.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro
5050
return nil
5151
}
5252

53+
//Shutdown stops the outbox
5354
func (out *AMQPOutbox) Shutdown() {
5455
close(out.stop)
55-
5656
}
5757

5858
//Post implements Outbox.Send

gbus/serialization/avro.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (as *Avro) Encode(obj gbus.Message) (msg []byte, err error) {
8787
tobj, ok := obj.(AvroMessageGenerated)
8888
if !ok {
8989
err := fmt.Errorf("could not convert obj to AvroMessageGenerated")
90-
logrus.WithError(err).WithField("obj", obj).Error("could not convert object")
90+
logrus.WithError(err).Error("could not convert object")
9191
return nil, err
9292
}
9393

gbus/serialization/proto.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (as *Proto) Decode(buffer []byte, schemaName string) (msg gbus.Message, err
7575
msg, ok = tmsg.(gbus.Message)
7676
if !ok {
7777
err = fmt.Errorf("could not cast obj to gbus.Message")
78-
as.logger.WithError(err).WithField("msg", tmsg).Errorf("could not cast %v to gbus.Message", tmsg)
78+
as.logger.WithError(err).Errorf("could not cast %v to gbus.Message", tmsg)
7979
return nil, err
8080
}
8181

gbus/tx/mysql/migrations.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package mysql
2+
3+
import (
4+
"database/sql"
5+
6+
"fmt"
7+
"github.com/lopezator/migrator"
8+
"github.com/wework/grabbit/gbus/tx"
9+
)
10+
11+
//SagaStoreTableMigration creates the service saga store table
12+
func SagaStoreTableMigration(svcName string) *migrator.Migration {
13+
tblName := tx.GetSagatableName(svcName)
14+
15+
createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
16+
rec_id INT PRIMARY KEY AUTO_INCREMENT,
17+
saga_id VARCHAR(255) UNIQUE NOT NULL,
18+
saga_type VARCHAR(255) NOT NULL,
19+
saga_data LONGBLOB NOT NULL,
20+
version integer NOT NULL DEFAULT 0,
21+
last_update timestamp DEFAULT NOW(),
22+
INDEX ` + tblName + `_sagatype_idx (saga_type))`
23+
24+
return &migrator.Migration{
25+
Name: "create saga store table",
26+
Func: func(tx *sql.Tx) error {
27+
if _, err := tx.Exec(createTableQuery); err != nil {
28+
return err
29+
}
30+
return nil
31+
},
32+
}
33+
}
34+
35+
//OutboxMigrations creates service outbox table
36+
func OutboxMigrations(svcName string) *migrator.Migration {
37+
38+
query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` (
39+
rec_id int NOT NULL AUTO_INCREMENT,
40+
message_id varchar(50) NOT NULL UNIQUE,
41+
message_type varchar(50) NOT NULL,
42+
exchange varchar(50) NOT NULL,
43+
routing_key varchar(50) NOT NULL,
44+
publishing longblob NOT NULL,
45+
status int(11) NOT NULL,
46+
relay_id varchar(50) NULL,
47+
delivery_tag bigint(20) NOT NULL,
48+
delivery_attempts int NOT NULL DEFAULT 0,
49+
insert_date timestamp DEFAULT CURRENT_TIMESTAMP,
50+
PRIMARY KEY(rec_id),
51+
INDEX status_delivery (rec_id, status, delivery_attempts))`
52+
53+
return &migrator.Migration{
54+
Name: "create outbox table",
55+
Func: func(tx *sql.Tx) error {
56+
if _, err := tx.Exec(query); err != nil {
57+
return err
58+
}
59+
return nil
60+
},
61+
}
62+
}
63+
64+
//TimoutTableMigration creates the service timeout table, where timeouts are persisted
65+
func TimoutTableMigration(svcName string) *migrator.Migration {
66+
tblName := GetTimeoutsTableName(svcName)
67+
68+
createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
69+
rec_id INT PRIMARY KEY AUTO_INCREMENT,
70+
saga_id VARCHAR(255) UNIQUE NOT NULL,
71+
timeout DATETIME NOT NULL,
72+
INDEX (timeout),
73+
INDEX (saga_id)
74+
)`
75+
76+
return &migrator.Migration{
77+
Name: "create timeout table",
78+
Func: func(tx *sql.Tx) error {
79+
if _, err := tx.Exec(createTableQuery); err != nil {
80+
return err
81+
}
82+
return nil
83+
},
84+
}
85+
}
86+
87+
//EnsureSchema implements Grabbit's migrations strategy
88+
func EnsureSchema(db *sql.DB, svcName string) {
89+
migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName)
90+
91+
migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
92+
OutboxMigrations(svcName),
93+
SagaStoreTableMigration(svcName),
94+
TimoutTableMigration(svcName),
95+
))
96+
if err != nil {
97+
panic(err)
98+
}
99+
err = migrate.Migrate(db)
100+
if err != nil {
101+
panic(err)
102+
}
103+
}

0 commit comments

Comments
 (0)