From 46bee6ab779979b09cdd052c2f9f921de83eb4c7 Mon Sep 17 00:00:00 2001 From: Jozef Kralik Date: Fri, 12 Jul 2019 12:21:06 +0200 Subject: [PATCH 1/2] use mongo evenstore with replaced mongo driver to DB --- cqrs/eventstore/mongodb/eventstore.go | 64 +++++++++++----------- cqrs/eventstore/mongodb/eventstore_test.go | 2 +- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/cqrs/eventstore/mongodb/eventstore.go b/cqrs/eventstore/mongodb/eventstore.go index 8706aad5..6372103f 100644 --- a/cqrs/eventstore/mongodb/eventstore.go +++ b/cqrs/eventstore/mongodb/eventstore.go @@ -21,8 +21,10 @@ import ( "math/rand" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/go-ocf/cqrs/event" "github.com/go-ocf/cqrs/eventstore" @@ -39,17 +41,19 @@ func init() { // EventStore implements an EventStore for MongoDB. type EventStore struct { - es *cqrsMongodb.EventStore - session *mgo.Session - config Config + es *cqrsMongodb.EventStore + client *mongo.Client + config Config uniqueIdIsInitialized uint64 } type Config struct { - Host string `envconfig:"MONGO_HOST" default:localhost:27017"` - DatabaseName string `envconfig:"MONGO_DATABASE" default:"eventStore"` - BatchSize int `envconfig:"MONGO_BATCH_SIZE" default:"128"` + Host string `envconfig:"MONGO_HOST" default:localhost:27017"` + DatabaseName string `envconfig:"MONGO_DATABASE" default:"eventStore"` + BatchSize int `envconfig:"MONGO_BATCH_SIZE" default:"16"` + MaxPoolSize uint16 `envconfig:"MONGO_MAX_POOL_SIZE" default:"16"` + MaxConnIdleTime time.Duration `envconfig:"MONGO_MAX_CONN_IDLE_TIME" default:"240s"` } //String return string representation of Config @@ -60,22 +64,26 @@ func (c Config) String() string { //NewEventStore create a event store from configuration func NewEventStore(config Config, goroutinePoolGo eventstore.GoroutinePoolGoFunc) (*EventStore, error) { - session, err := mgo.Dial(config.Host) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://"+config.Host).SetMaxPoolSize(config.MaxPoolSize).SetMaxConnIdleTime(config.MaxConnIdleTime)) if err != nil { - return nil, fmt.Errorf("cannot dial to DB: %v", err) + return nil, fmt.Errorf("could not dial database: %v", err) + } + err = client.Ping(ctx, readpref.Primary()) + if err != nil { + return nil, fmt.Errorf("could not dial database: %v", err) } - session.SetMode(mgo.Strong, true) - session.SetSafe(&mgo.Safe{W: 1}) - - es, err := cqrsMongodb.NewEventStoreWithSession(session, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf) + es, err := cqrsMongodb.NewEventStoreWithClient(ctx, client, config.DatabaseName, "events", config.BatchSize, goroutinePoolGo, cqrsUtils.Marshal, cqrsUtils.Unmarshal, log.Debugf) if err != nil { return nil, err } return &EventStore{ - es: es, - session: session, - config: config, + es: es, + client: client, + config: config, }, nil } @@ -98,13 +106,11 @@ func (s *EventStore) LoadFromSnapshot(ctx context.Context, queries []eventstore. // Clear clears the event storage. func (s *EventStore) Clear(ctx context.Context) error { err1 := s.es.Clear(ctx) - sess := s.session.Copy() - defer sess.Close() - err2 := sess.DB(s.es.DBName()).C(instanceIdsCollection).DropCollection() + err2 := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).Drop(ctx) if err1 != nil { return fmt.Errorf("cannot clear events: %v", err1) } - if err2 != nil && err2 != mgo.ErrNotFound { + if err2 != nil && err2 != mongo.ErrNoDocuments { return fmt.Errorf("cannot clear sequence number: %v", err2) } return nil @@ -117,8 +123,6 @@ type seqRecord struct { // GetInstanceId returns int64 that is unique func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int64, error) { - sess := s.session.Copy() - defer sess.Close() var newInstanceId uint32 for { newInstanceId = rand.Uint32() @@ -128,8 +132,8 @@ func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int InstanceId: int64(newInstanceId), } - if err := sess.DB(s.es.DBName()).C(instanceIdsCollection).Insert(r); err != nil { - if mgo.IsDup(err) { + if _, err := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).InsertOne(ctx, r); err != nil { + if cqrsMongodb.IsDup(err) { rand.Seed(time.Now().UTC().UnixNano()) } else { return -1, fmt.Errorf("cannot generate instance id: %v", err) @@ -143,15 +147,13 @@ func (s *EventStore) GetInstanceId(ctx context.Context, aggregateId string) (int } func (s *EventStore) RemoveInstanceId(ctx context.Context, instanceId int64) error { - sess := s.session.Copy() - defer sess.Close() - if err := sess.DB(s.es.DBName()).C(instanceIdsCollection).Remove(bson.M{"_id": instanceId}); err != nil { - return fmt.Errorf("cannot removce instance id: %v", err) + if _, err := s.client.Database(s.es.DBName()).Collection(instanceIdsCollection).DeleteOne(ctx, bson.M{"_id": instanceId}); err != nil { + return fmt.Errorf("cannot remove instance id: %v", err) } return nil } // Close closes the database session. -func (s *EventStore) Close() { - s.es.Close() +func (s *EventStore) Close(ctx context.Context) error { + return s.es.Close(ctx) } diff --git a/cqrs/eventstore/mongodb/eventstore_test.go b/cqrs/eventstore/mongodb/eventstore_test.go index 7be5ef10..e8c04346 100644 --- a/cqrs/eventstore/mongodb/eventstore_test.go +++ b/cqrs/eventstore/mongodb/eventstore_test.go @@ -37,7 +37,7 @@ func TestInstanceId(t *testing.T) { }, nil) defer func() { store.Clear(ctx) - store.Close() + store.Close(ctx) }() assert.NoError(t, err) From bd3feb0a1fc9358b6618c4fe036346efdb1a5479 Mon Sep 17 00:00:00 2001 From: Jozef Kralik Date: Fri, 12 Jul 2019 13:01:29 +0200 Subject: [PATCH 2/2] update Gopkg --- Gopkg.lock | 248 ++++++++++++++++++++++++++++++++++++++++++----------- Gopkg.toml | 4 +- 2 files changed, 199 insertions(+), 53 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 6dded071..9eaeb29a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -10,12 +10,12 @@ version = "v1.4.0" [[projects]] - digest = "1:2ec153af6a806c3d63d4299f2549bcb29d75d9703097341be309a46db3481488" + digest = "1:b449dbaada891dc97f016bc3519bb1af0e1a4296828f7a560a0ba47ea75f20bb" name = "github.com/Shopify/sarama" packages = ["."] pruneopts = "UT" - revision = "ea9ab1c316850bee881a07bb2555ee8a685cd4b6" - version = "v1.22.1" + revision = "dde3ddda8b4b3a594690086725799ab1573bb895" + version = "v1.23.0" [[projects]] digest = "1:526d64d0a3ac6c24875724a9355895be56a21f89a5d3ab5ba88d91244269a7d8" @@ -59,22 +59,7 @@ [[projects]] branch = "master" - digest = "1:bac70cf9dfdf88b92c05ec8a69d7a97e8b7e72ac748373193ebfa73cbc2e0aca" - name = "github.com/globalsign/mgo" - packages = [ - ".", - "bson", - "internal/json", - "internal/sasl", - "internal/scram", - "txn", - ] - pruneopts = "UT" - revision = "eeefdecb41b842af6dc652aaea4026e8403e62df" - -[[projects]] - branch = "master" - digest = "1:a15ef16edc1c454e8688baabdf8772d017ebf43e33833817e6be0c742a08c30a" + digest = "1:47373ede16ae324cc9cc63976e8f177434e9ee0f796e4d5da007693273dbc2be" name = "github.com/go-ocf/cqrs" packages = [ ".", @@ -87,7 +72,7 @@ "protobuf/eventbus", ] pruneopts = "UT" - revision = "71a295af5428973e5c3a419108dcc86ca1f75c00" + revision = "7a358a5965098a5e9487ea38ffb09f0437588704" [[projects]] branch = "master" @@ -100,6 +85,14 @@ pruneopts = "UT" revision = "6251d1201ea55633381bcccde2f47303b3736a40" +[[projects]] + digest = "1:586ea76dbd0374d6fb649a91d70d652b7fe0ccffb8910a77468e7702e7901f3d" + name = "github.com/go-stack/stack" + packages = ["."] + pruneopts = "UT" + revision = "2fee6af1a9795aafbe0253a0cfbdf668e1fb8a9a" + version = "v1.8.0" + [[projects]] digest = "1:bed9d72d596f94e65fff37f4d6c01398074a6bb1c3f3ceff963516bd01db6ff5" name = "github.com/gofrs/uuid" @@ -121,7 +114,7 @@ version = "v1.2.1" [[projects]] - digest = "1:239c4c7fd2159585454003d9be7207167970194216193a8a210b8d29576f19c9" + digest = "1:f5ce1529abc1204444ec73779f44f94e2fa8fcdb7aca3c355b0c95947e4005c6" name = "github.com/golang/protobuf" packages = [ "proto", @@ -131,8 +124,8 @@ "ptypes/timestamp", ] pruneopts = "UT" - revision = "b5d812f8a3706043e23a9cd5babf2e5423744d30" - version = "v1.3.1" + revision = "6c65a5562fc06764971b7c5d05c76c75e84bdbf7" + version = "v1.3.2" [[projects]] branch = "master" @@ -143,7 +136,26 @@ revision = "2a8bb927dd31d8daada140a5d09578521ce5c36a" [[projects]] - digest = "1:589fb1a525a9596f15cf90064ad57cfad5de59b3579e91f272f470ab49051647" + digest = "1:f14364057165381ea296e49f8870a9ffce2b8a95e34d6ae06c759106aaef428c" + name = "github.com/hashicorp/go-uuid" + packages = ["."] + pruneopts = "UT" + revision = "4f571afc59f3043a65f8fe6bf46d887b10a01d43" + version = "v1.0.1" + +[[projects]] + digest = "1:ae221758bdddd57f5c76f4ee5e4110af32ee62583c46299094697f8f127e63da" + name = "github.com/jcmturner/gofork" + packages = [ + "encoding/asn1", + "x/crypto/pbkdf2", + ] + pruneopts = "UT" + revision = "dc7c13fece037a4a36e2b3c69db4991498d30692" + version = "v1.0.0" + +[[projects]] + digest = "1:823dc0f8e4cdad8d91e6aba4cdd985a035596a8aa02e9d2abf1bec347894c41f" name = "github.com/klauspost/compress" packages = [ "flate", @@ -151,8 +163,8 @@ "zlib", ] pruneopts = "UT" - revision = "05d6922103c7275c684bd5df6608b25577b10521" - version = "v1.7.0" + revision = "315059c39568ee89ae0c118bc6dc7306041543c3" + version = "v1.7.2" [[projects]] digest = "1:923c4d7194b42e054b2eb8a6c62824ac55e23ececc1c7e48d4da69c971c55954" @@ -175,12 +187,12 @@ version = "v1.7.2" [[projects]] - digest = "1:0b5d91120efc54504bc253fda90b08c4be88cd78a4023ef60019e95bb0cdc136" + digest = "1:237d85e4f5e91ac6cfc13bad508627570ff29efa28f523d41aac2d39ca15950f" name = "github.com/nats-io/nkeys" packages = ["."] pruneopts = "UT" - revision = "1546a3320a8f195a5b5c84aef8309377c2e411d5" - version = "v0.0.2" + revision = "0073b400419be3ffb022ef46b675805e92534a34" + version = "v0.1.0" [[projects]] digest = "1:599f3202ce0a754144ddc4be4c6df9c6ab27b1d722a63ede6b2e0c3a2cc338a8" @@ -191,15 +203,15 @@ version = "v1.0.1" [[projects]] - digest = "1:259f9b7645983a7a823318d78aa96dec68af8891f706493ac1ec04d819cb977c" + digest = "1:f690a0a27cefae695fa9587aa3ed23652e593be1d98b35f8184d10bccec30444" name = "github.com/pierrec/lz4" packages = [ ".", "internal/xxh32", ] pruneopts = "UT" - revision = "d705d4371bfccdf47f10e45584e896026c83616f" - version = "v2.2.3" + revision = "057d66e894a4e55853274a3bbbf7de02ba639e43" + version = "v2.2.4" [[projects]] digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe" @@ -211,11 +223,11 @@ [[projects]] branch = "master" - digest = "1:d38f81081a389f1466ec98192cf9115a82158854d6f01e1c23e2e7554b97db71" + digest = "1:267844804416a11470a2fbf68549b98959e22d77e0921e2349276958477f08a3" name = "github.com/rcrowley/go-metrics" packages = ["."] pruneopts = "UT" - revision = "3113b8401b8a98917cde58f8bbd42a1b1c03b1fd" + revision = "9beb055b7962d16947a14e1cd718098a2431e20e" [[projects]] digest = "1:5da8ce674952566deae4dbc23d07c85caafc6cfa815b0b3e03e41979cedb8750" @@ -229,12 +241,12 @@ version = "v1.3.0" [[projects]] - digest = "1:d0072748c62defde1ad99dde77f6ffce492a0e5aea9204077e497c7edfb86653" + digest = "1:5a1cf4e370bc86137b58da2ae065e76526d32b11f62a7665f36dbd5f41fa95ff" name = "github.com/ugorji/go" packages = ["codec"] pruneopts = "UT" - revision = "2adff0894ba3bc2eeb9f9aea45fefd49802e1a13" - version = "v1.1.4" + revision = "23ab95ef5dc3b70286760af84ce2327a2b64ed62" + version = "v1.1.7" [[projects]] digest = "1:c468422f334a6b46a19448ad59aaffdfc0a36b08fdcc1c749a0b29b6453d7e59" @@ -246,7 +258,7 @@ [[projects]] branch = "master" - digest = "1:ae1322db6af7b1244fbd2f6e25c4406bd16ce348f1b909da91b2a0eef9edff65" + digest = "1:a0b7634717eaa7c1e556caa592f48b6eb97334dc2e7181298261cd9e6672afc3" name = "github.com/valyala/fasthttp" packages = [ ".", @@ -254,7 +266,62 @@ "stackless", ] pruneopts = "UT" - revision = "9ba4cef1bac8ebc8fe71123bd20cfc681fdb78a8" + revision = "a0248ed3a1ce5bf65a1fe9734237590bfc80d8f3" + +[[projects]] + branch = "master" + digest = "1:40fdfd6ab85ca32b6935853bbba35935dcb1d796c8135efd85947566c76e662e" + name = "github.com/xdg/scram" + packages = ["."] + pruneopts = "UT" + revision = "7eeb5667e42c09cb51bf7b7c28aea8c56767da90" + +[[projects]] + branch = "master" + digest = "1:f5c1d04bc09c644c592b45b9f0bad4030521b1a7d11c7dadbb272d9439fa6e8e" + name = "github.com/xdg/stringprep" + packages = ["."] + pruneopts = "UT" + revision = "73f8eece6fdcd902c185bf651de50f3828bed5ed" + +[[projects]] + digest = "1:721dfafdcdef689834240d27526505237b230b7dea54df28ec299ab182cfbd1f" + name = "go.mongodb.org/mongo-driver" + packages = [ + "bson", + "bson/bsoncodec", + "bson/bsonrw", + "bson/bsontype", + "bson/primitive", + "event", + "internal", + "mongo", + "mongo/options", + "mongo/readconcern", + "mongo/readpref", + "mongo/writeconcern", + "tag", + "version", + "x/bsonx", + "x/bsonx/bsoncore", + "x/mongo/driver", + "x/mongo/driver/auth", + "x/mongo/driver/auth/internal/gssapi", + "x/mongo/driver/session", + "x/mongo/driver/topology", + "x/mongo/driver/uuid", + "x/network/address", + "x/network/command", + "x/network/compressor", + "x/network/connection", + "x/network/connstring", + "x/network/description", + "x/network/result", + "x/network/wiremessage", + ] + pruneopts = "UT" + revision = "0d1270edf53072da4da781b76d2e1db58831152f" + version = "v1.0.4" [[projects]] digest = "1:a5158647b553c61877aa9ae74f4015000294e47981e6b8b07525edcbb0747c81" @@ -289,18 +356,20 @@ [[projects]] branch = "master" - digest = "1:d5891c5bca9c62e5d394ca26491d2b710a1dc08cedeb0ca8f9ac4c3305120b02" + digest = "1:addbd8224d6adb3e0387ebdd1aa2aef4a0844c9b8cfde3d31b6b6c633d2cf2ce" name = "golang.org/x/crypto" packages = [ "ed25519", "ed25519/internal/edwards25519", + "md4", + "pbkdf2", ] pruneopts = "UT" - revision = "5c40567a22f818bd14a1ea7245dad9f8ef0691aa" + revision = "4def268fd1a49955bfb3dda92fe3db4f924f2285" [[projects]] branch = "master" - digest = "1:4ea677236b1c0ec4ff523ac8293022fcf0a8b43c3f5a29816955505fbce58155" + digest = "1:0ef8e3551a664181c9e8b0c65e40ac43a2b59987369a68460fc951a923086133" name = "golang.org/x/net" packages = [ "bpf", @@ -318,18 +387,26 @@ "trace", ] pruneopts = "UT" - revision = "d28f0bde5980168871434b95cfc858db9f2a7a99" + revision = "da137c7871d730100384dbcf36e6f8fa493aef5b" [[projects]] branch = "master" - digest = "1:3398992b8f4b00ce4a6a5f9770008cf66dcff8f128b51eeab8c1f657096cac8f" + digest = "1:382bb5a7fb4034db3b6a2d19e5a4a6bcf52f4750530603c01ca18a172fa3089b" + name = "golang.org/x/sync" + packages = ["semaphore"] + pruneopts = "UT" + revision = "112230192c580c3556b8cee6403af37a4fc5f28c" + +[[projects]] + branch = "master" + digest = "1:31d44814b45607afa77c6e8ef20ea8bacc1d1053fe084440c874f311732e7ec3" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "UT" - revision = "15dcb6c0061f497a3f66e3ea034b629c6dd4d99e" + revision = "fae7ac547cb717d141c433a2a173315e216b64c4" [[projects]] digest = "1:8d8faad6b12a3a4c819a3f9618cb6ee1fa1cfc33253abeeea8b55336721e3405" @@ -362,10 +439,10 @@ name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "UT" - revision = "a7e196e89fd3a3c4d103ca540bd5dac3a736e375" + revision = "3bdd9d9f5532d75d09efb230bd767d265245cfe5" [[projects]] - digest = "1:e8800ddadd6bce3bc0c5ffd7bc55dbdddc6e750956c10cc10271cade542fccbe" + digest = "1:cf01ae0753310464677058b125fa31e74fd943781782ada503180ad784fc83d3" name = "google.golang.org/grpc" packages = [ ".", @@ -397,21 +474,86 @@ "resolver", "resolver/dns", "resolver/passthrough", + "serviceconfig", "stats", "status", "tap", ] pruneopts = "UT" - revision = "501c41df7f472c740d0674ff27122f3f48c80ce7" - version = "v1.21.1" + revision = "1d89a3c832915b2314551c1d2a506874d62e53f7" + version = "v1.22.0" + +[[projects]] + digest = "1:c902038ee2d6f964d3b9f2c718126571410c5d81251cbab9fe58abd37803513c" + name = "gopkg.in/jcmturner/aescts.v1" + packages = ["."] + pruneopts = "UT" + revision = "f6abebb3171c4c1b1fea279cb7c7325020a26290" + version = "v1.0.1" + +[[projects]] + digest = "1:a1a3e185c03d79a7452d5d5b4c91be4cc433f55e6ed3a35233d852c966e39013" + name = "gopkg.in/jcmturner/dnsutils.v1" + packages = ["."] + pruneopts = "UT" + revision = "13eeb8d49ffb74d7a75784c35e4d900607a3943c" + version = "v1.0.1" + +[[projects]] + digest = "1:dc01a587d07be012625ba63df6d4224ae6d7a83e79bfebde6d987c10538d66dd" + name = "gopkg.in/jcmturner/gokrb5.v7" + packages = [ + "asn1tools", + "client", + "config", + "credentials", + "crypto", + "crypto/common", + "crypto/etype", + "crypto/rfc3961", + "crypto/rfc3962", + "crypto/rfc4757", + "crypto/rfc8009", + "gssapi", + "iana", + "iana/addrtype", + "iana/adtype", + "iana/asnAppTag", + "iana/chksumtype", + "iana/errorcode", + "iana/etypeID", + "iana/flags", + "iana/keyusage", + "iana/msgtype", + "iana/nametype", + "iana/patype", + "kadmin", + "keytab", + "krberror", + "messages", + "pac", + "types", + ] + pruneopts = "UT" + revision = "363118e62befa8a14ff01031c025026077fe5d6d" + version = "v7.3.0" + +[[projects]] + digest = "1:0f16d9c577198e3b8d3209f5a89aabe679525b2aba2a7548714e973035c0e232" + name = "gopkg.in/jcmturner/rpc.v1" + packages = [ + "mstypes", + "ndr", + ] + pruneopts = "UT" + revision = "99a8ce2fbf8b8087b6ed12a37c61b10f04070043" + version = "v1.1.0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 input-imports = [ "github.com/Shopify/sarama", - "github.com/globalsign/mgo", - "github.com/globalsign/mgo/bson", "github.com/go-ocf/cqrs", "github.com/go-ocf/cqrs/event", "github.com/go-ocf/cqrs/eventbus", @@ -429,6 +571,10 @@ "github.com/ugorji/go/codec", "github.com/valyala/fasthttp", "github.com/valyala/fasthttp/fasthttputil", + "go.mongodb.org/mongo-driver/bson", + "go.mongodb.org/mongo-driver/mongo", + "go.mongodb.org/mongo-driver/mongo/options", + "go.mongodb.org/mongo-driver/mongo/readpref", "go.uber.org/zap", "google.golang.org/grpc", "google.golang.org/grpc/codes", diff --git a/Gopkg.toml b/Gopkg.toml index 9dbdb748..eb4d148f 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -30,8 +30,8 @@ version = "1.22.1" [[constraint]] - branch = "master" - name = "github.com/globalsign/mgo" + name = "go.mongodb.org/mongo-driver" + version = "1.0.4" [[constraint]] branch = "master"