diff --git a/README.md b/README.md index 14bf6df8..c2d6425e 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ $ ./kafka-mongo-watcher -KAFKA_MONGO_WATCHER_REPLAY=true ... Tech HTTP server started {"facility":"kafka-mongo-watcher","version":"wip","addr":":8001","file":"/usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s","line":1373} Connected to mongodb database {"facility":"kafka-mongo-watcher","version":"wip","uri":"mongodb://root:toor@127.0.0.1:27011,127.0.0.1:27012,127.0.0.1:27013/watcher?replicaSet=replicaset\u0026authSource=admin"} - Connected to kafka producer {"facility":"kafka-mongo-watcher","version":"wip","bootstrao-servers":"127.0.0.1:9092"} + Connected to kafka producer {"facility":"kafka-mongo-watcher","version":"wip","bootstrap-servers":"127.0.0.1:9092"} ... ``` diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index 08680c51..5a8e47ec 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -39,8 +39,13 @@ func handleExitSignal(ctx context.Context, cancel context.CancelFunc, container log.Info("Signal received: gracefully stopping application", logger.String("signal", signal.String())) cancel() + log.Info("Close kafka client") container.GetKafkaClient().Close() + + log.Info("Close mongo client") container.GetMongoConnection().Client().Disconnect(ctx) + + log.Info("Close tech server") container.GetTechServer().Close(ctx) }, os.Interrupt, syscall.SIGTERM) } diff --git a/internal/kafka/client.go b/internal/kafka/client.go index 134eeda3..0f2c1d7c 100644 --- a/internal/kafka/client.go +++ b/internal/kafka/client.go @@ -1,6 +1,8 @@ package kafka import ( + "sync" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) @@ -12,6 +14,8 @@ type Client interface { type client struct { producer KafkaProducer + isClosed bool + closeMtx sync.Mutex } // NewClient returns a basic kafka client @@ -55,9 +59,16 @@ func (c *client) Events() chan kafka.Event { // Close allows to close/disconnect the kafka client func (c *client) Close() { + defer c.closeMtx.Unlock() + c.closeMtx.Lock() + + if c.isClosed { + return + } + for wait := true; wait; wait = c.producer.Len() > 0 { // Wait for all events to be retrieved from Kafka library } - c.producer.Close() + c.isClosed = true } diff --git a/internal/mongo/watch_producer.go b/internal/mongo/watch_producer.go index 5267c81a..e6b9c214 100644 --- a/internal/mongo/watch_producer.go +++ b/internal/mongo/watch_producer.go @@ -2,6 +2,7 @@ package mongo import ( "context" + "fmt" "time" "github.com/gol4ng/logger" @@ -31,11 +32,11 @@ func (w *WatchProducer) GetProducer(o ...WatchOption) ChangeEventProducer { pipeline = append(customElements, pipeline...) } - cursor, err := w.watch(ctx, pipeline, config, config.resumeAfter, nil) + cursor, curErr := w.watch(ctx, pipeline, config, config.resumeAfter, nil) - if err != nil { - w.logger.Error("Mongo client: An error has occured while trying to watch collection", logger.String("collection", w.collection.Name()), logger.Error("error", err)) - return nil, err + if curErr != nil { + w.logger.Error("Mongo client: An error has occured while trying to watch collection", logger.String("collection", w.collection.Name()), logger.Error("error", curErr)) + return nil, curErr } var events = make(chan *ChangeEvent) @@ -45,18 +46,22 @@ func (w *WatchProducer) GetProducer(o ...WatchOption) ChangeEventProducer { for { select { case <-ctx.Done(): - w.logger.Info("Context canceled") + w.logger.Info("Context canceled", logger.Error("err", ctx.Err())) cursor.Close(ctx) return - case startAfter := <-w.sendEvents(ctx, cursor, events): - w.logger.Info("Mongo client : Retry to watch collection", logger.String("collection", w.collection.Name()), logger.Any("start_after", startAfter)) + default: + startAfter, err := w.sendEvents(ctx, cursor, events) cursor.Close(ctx) + if err == nil { + return + } + w.logger.Info("Mongo client : Retry to watch collection", logger.String("collection", w.collection.Name()), logger.Any("start_after", startAfter), logger.Error("error", err)) if config.maxRetries == 0 { return } - cursor, err = w.watch(ctx, pipeline, config, nil, startAfter) - if err != nil { - w.logger.Error("Mongo client : An error has occured while retrying to watch collection", logger.String("collection", w.collection.Name()), logger.Error("error", err)) + cursor, curErr = w.watch(ctx, pipeline, config, nil, startAfter) + if curErr != nil { + w.logger.Error("Mongo client : An error has occured while retrying to watch collection", logger.String("collection", w.collection.Name()), logger.Error("error", curErr)) return } } @@ -103,31 +108,24 @@ func (w *WatchProducer) watch(ctx context.Context, pipeline bson.A, config *Watc return } -func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent) <-chan bson.Raw { - resumeToken := make(chan bson.Raw, 1) - - go func() { - defer close(resumeToken) - for cursor.Next(ctx) { - if cursor.ID() == 0 { - w.logger.Error("Mongo client: Cursor has been closed") - break - } - if err := cursor.Err(); err != nil { - w.logger.Error("Mongo client: Failed to watch collection", logger.Error("error", err)) - break - } - event := &ChangeEvent{} - if err := cursor.Decode(event); err != nil { - w.logger.Error("Mongo client: Unable to decode change event value from cursor", logger.Error("error", err)) - continue - } - events <- event +func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent) (bson.Raw, error) { + var resumeToken bson.Raw + for cursor.Next(ctx) { + resumeToken = cursor.ResumeToken() + if cursor.ID() == 0 { + return resumeToken, fmt.Errorf("cursor has been closed") } - resumeToken <- cursor.ResumeToken() - }() - - return resumeToken + if err := cursor.Err(); err != nil { + return resumeToken, err + } + event := &ChangeEvent{} + if err := cursor.Decode(event); err != nil { + w.logger.Error("Mongo client: Unable to decode change event value from cursor", logger.Error("error", err)) + continue + } + events <- event + } + return resumeToken, nil } func NewWatchProducer(adapter CollectionAdapter, logger logger.LoggerInterface, customPipeline string) *WatchProducer { diff --git a/internal/mongo/watch_producer_test.go b/internal/mongo/watch_producer_test.go index 545a931c..60b298ce 100644 --- a/internal/mongo/watch_producer_test.go +++ b/internal/mongo/watch_producer_test.go @@ -186,6 +186,7 @@ func TestWatchProduceWhenCustomPipeline(t *testing.T) { mongoCursor.EXPECT().ID().Return(int64(1234)).AnyTimes() mongoCursor.EXPECT().Err().Return(nil).AnyTimes() mongoCursor.EXPECT().ResumeToken().Return(bson.Raw{}).AnyTimes() + mongoCursor.EXPECT().Close(ctx).Return(nil).AnyTimes() watcher := NewWatchProducer(mongoCollection, logger.NewNopLogger(), customPipeline) diff --git a/internal/service/kafka.go b/internal/service/kafka.go index dd7395f2..bf6fa9f5 100644 --- a/internal/service/kafka.go +++ b/internal/service/kafka.go @@ -17,7 +17,7 @@ func (container *Container) GetKafkaProducer() kafka.KafkaProducer { } log := container.GetLogger() - log.Info("Connected to kafka producer", logger.String("bootstrao-servers", container.Cfg.Kafka.BootstrapServers)) + log.Info("Connected to kafka producer", logger.String("bootstrap-servers", container.Cfg.Kafka.BootstrapServers)) container.kafkaProducer = producer }