From 46710e5b1649104a2c07c5b9c78580ad29469ceb Mon Sep 17 00:00:00 2001 From: Yusuke Kato Date: Sun, 23 Feb 2020 20:39:54 +0900 Subject: [PATCH] [patch] bugfix gateway high memory usage (#246) --- go.mod | 4 +- go.sum | 8 +- internal/net/grpc/client.go | 159 ++++++++++++++++----------- internal/net/grpc/pool.go | 58 ++++++---- internal/net/grpc/pool_bench_test.go | 12 +- internal/net/tcp/dialer.go | 2 - pkg/gateway/vald/service/gateway.go | 55 ++++++--- 7 files changed, 179 insertions(+), 119 deletions(-) diff --git a/go.mod b/go.mod index 3fb26dff9a..99e4df96b9 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,11 @@ go 1.13 replace ( github.com/boltdb/bolt => github.com/boltdb/bolt v1.3.1 github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a - github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5 + github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852 github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.1 github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.8.0 github.com/gorilla/mux => github.com/gorilla/mux v1.7.4 - golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 + golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d k8s.io/api => k8s.io/api v0.17.3 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery => k8s.io/apimachinery v0.17.3 diff --git a/go.sum b/go.sum index 69a3379ee8..827e0b4548 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRf github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a h1:S0iuKUl+IaDNndrUoJYN1qCCsK1K1/XDaK9qlNXo7Dw= github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5 h1:ZZVxQRCm4ewuoqqLBJ7LHpsk4OGx2PkyCsRKLq4oHgE= -github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= +github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852 h1:2j3H2rpbxLvlA1xzFu8ijFzeLc7JPTL4rbnLiJ95KRQ= +github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/gocraft/dbr/v2 v2.7.0 h1:x+UnhSBYPFBBdtikLSMLQ9KPuquSUj4yBijsQAhhNZo= github.com/gocraft/dbr/v2 v2.7.0/go.mod h1:wQdbxPBSloo2OlSedMxfNW0mgk0GXys9O1VFmQiwcx4= github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= @@ -473,8 +473,8 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9E go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o= go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= -golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo= -golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw= +golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index 2a15671d2f..15eed11023 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -135,7 +135,6 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, } return true }) - for _, addr := range reconnList { if g.bo != nil { _, err = g.bo.Do(ctx, func() (interface{}, error) { @@ -162,19 +161,25 @@ func (g *gRPCClient) Range(ctx context.Context, case <-ctx.Done(): return false default: - if err := pool.Do(func(conn *ClientConn) (err error) { - if conn == nil { - return errors.ErrGRPCClientConnNotFound(addr) - } - if g.bo != nil { - _, err = g.bo.Do(ctx, func() (r interface{}, err error) { - err = f(ctx, addr, conn, g.copts...) - return + var err error + if g.bo != nil { + _, err = g.bo.Do(ctx, func() (r interface{}, err error) { + return nil, pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(ctx, addr, conn, g.copts...) }) - return err - } - return f(ctx, addr, conn, g.copts...) - }); err != nil { + }) + } else { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(ctx, addr, conn, g.copts...) + }) + } + if err != nil { rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error()) } } @@ -194,19 +199,24 @@ func (g *gRPCClient) RangeConcurrent(ctx context.Context, case <-egctx.Done(): return nil default: - if err = pool.Do(func(conn *ClientConn) (err error) { - if conn == nil { - return errors.ErrGRPCClientConnNotFound(addr) - } - if g.bo != nil { - _, err = g.bo.Do(egctx, func() (r interface{}, err error) { - err = f(egctx, addr, conn, g.copts...) - return + if g.bo != nil { + _, err = g.bo.Do(egctx, func() (r interface{}, err error) { + return nil, pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(egctx, addr, conn, g.copts...) }) - return err - } - return f(egctx, addr, conn, g.copts...) - }); err != nil { + }) + } else { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(egctx, addr, conn, g.copts...) + }) + } + if err != nil { return errors.ErrRPCCallFailed(addr, err) } return nil @@ -228,19 +238,24 @@ func (g *gRPCClient) OrderedRange(ctx context.Context, default: pool, ok := g.conns.Load(addr) if ok { - if err = pool.Do(func(conn *ClientConn) (err error) { - if conn == nil { - return errors.ErrGRPCClientConnNotFound(addr) - } - if g.bo != nil { - _, err = g.bo.Do(ctx, func() (r interface{}, err error) { - err = f(ctx, addr, conn, g.copts...) - return + if g.bo != nil { + _, err = g.bo.Do(ctx, func() (r interface{}, err error) { + return nil, pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(ctx, addr, conn, g.copts...) }) - return err - } - return f(ctx, addr, conn, g.copts...) - }); err != nil { + }) + } else { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(ctx, addr, conn, g.copts...) + }) + } + if err != nil { rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error()) } } @@ -262,23 +277,28 @@ func (g *gRPCClient) OrderedRangeConcurrent(ctx context.Context, } eg.Go(safety.RecoverFunc(func() (err error) { select { - case <-ctx.Done(): + case <-egctx.Done(): return nil default: - if err = pool.Do(func(conn *ClientConn) (err error) { - if conn == nil { - return errors.ErrGRPCClientConnNotFound(addr) - } - if g.bo != nil { - _, err = g.bo.Do(egctx, func() (r interface{}, err error) { - err = f(egctx, addr, conn, g.copts...) - return + if g.bo != nil { + _, err = g.bo.Do(egctx, func() (r interface{}, err error) { + return nil, pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(egctx, addr, conn, g.copts...) }) - return nil - } - return f(egctx, addr, conn, g.copts...) - }); err != nil { - rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error()) + }) + } else { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } + return f(egctx, addr, conn, g.copts...) + }) + } + if err != nil { + return errors.ErrRPCCallFailed(addr, err) } return nil } @@ -294,23 +314,30 @@ func (g *gRPCClient) Do(ctx context.Context, addr string, if !ok { return nil, errors.ErrGRPCClientConnNotFound(addr) } - if err = pool.Do(func(conn *ClientConn) (err error) { - if conn == nil { - return errors.ErrGRPCClientConnNotFound(addr) - } - if g.bo != nil { - data, err = g.bo.Do(ctx, func() (r interface{}, err error) { - r, err = f(ctx, conn, g.copts...) - if err != nil { - return nil, err + if g.bo != nil { + data, err = g.bo.Do(ctx, func() (r interface{}, err error) { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) } - return r, nil + r, err = f(ctx, conn, g.copts...) + return err }) - } else { + if err != nil { + return nil, err + } + return r, err + }) + } else { + err = pool.Do(func(conn *ClientConn) (err error) { + if conn == nil { + return errors.ErrGRPCClientConnNotFound(addr) + } data, err = f(ctx, conn, g.copts...) - } - return err - }); err != nil { + return err + }) + } + if err != nil { return nil, errors.ErrRPCCallFailed(addr, err) } return @@ -330,7 +357,7 @@ func (g *gRPCClient) Connect(ctx context.Context, addr string, dopts ...DialOpti if pool.IsHealthy() { return nil } - pool, err := pool.Connect(ctx) + pool, err = pool.Connect(ctx) if err != nil { return err } diff --git a/internal/net/grpc/pool.go b/internal/net/grpc/pool.go index f56a5ca519..63a82094dc 100644 --- a/internal/net/grpc/pool.go +++ b/internal/net/grpc/pool.go @@ -96,21 +96,20 @@ func (c *ClientConnPool) Disconnect() (rerr error) { if err != nil { rerr = errors.Wrap(rerr, err.Error()) } - for { - conn := c.Get() - if conn == nil { - return - } - err = conn.Close() - if err != nil { - rerr = errors.Wrap(rerr, err.Error()) + for i := uint64(0); i < atomic.LoadUint64(&c.length); i++ { + conn, _ := c.Get() + if conn != nil { + err = conn.Close() + if err != nil { + rerr = errors.Wrap(rerr, err.Error()) + } } } + return } func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err error) { - if c.closing.Load().(bool) || - atomic.LoadUint64(&c.length) > c.size { + if c.closing.Load().(bool) { return c, nil } @@ -121,6 +120,10 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e } } + if atomic.LoadUint64(&c.length) > c.size { + return c, nil + } + if c.host == localHost || c.host == localIPv4 { for { @@ -168,29 +171,34 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e return c, nil } -func (c *ClientConnPool) Get() *ClientConn { +func (c *ClientConnPool) Get() (*ClientConn, bool) { conn, ok := c.pool.Get().(*ClientConn) if !ok { - return c.conn + return c.conn, true } atomic.AddUint64(&c.length, ^uint64(0)) if conn == nil || !isHealthy(conn) { if c.closing.Load().(bool) { - return nil + return nil, false } - return c.conn + return c.conn, true } - return conn + + if conn == c.conn { + return conn, true + } + + return conn, false } func (c *ClientConnPool) Put(conn *ClientConn) error { if conn != nil { - if c.closing.Load().(bool) { - return nil - } - if atomic.LoadUint64(&c.length) > c.size { + if c.closing.Load().(bool) || atomic.LoadUint64(&c.length) > c.size { return conn.Close() } + if conn == c.conn { + return nil + } atomic.AddUint64(&c.length, 1) c.pool.Put(conn) } @@ -198,17 +206,21 @@ func (c *ClientConnPool) Put(conn *ClientConn) error { } func (c *ClientConnPool) Do(f func(conn *ClientConn) error) (err error) { - conn := c.Get() + conn, shared := c.Get() err = f(conn) - c.Put(conn) + if !shared { + c.Put(conn) + } return err } func (c *ClientConnPool) IsHealthy() bool { for i := uint64(0); i < c.size; i++ { - conn := c.Get() + conn, shared := c.Get() if conn != nil && isHealthy(conn) { - c.Put(conn) + if !shared { + c.Put(conn) + } } else { if conn != nil { conn.Close() diff --git a/internal/net/grpc/pool_bench_test.go b/internal/net/grpc/pool_bench_test.go index 423035d613..70c91eb3e0 100644 --- a/internal/net/grpc/pool_bench_test.go +++ b/internal/net/grpc/pool_bench_test.go @@ -91,9 +91,11 @@ func Benchmark_ConnPool(b *testing.B) { } for i := 0; i < b.N; i++ { - conn := pool.Get() + conn, shared := pool.Get() do(b, conn) - pool.Put(conn) + if !shared { + pool.Put(conn) + } } } @@ -124,9 +126,11 @@ func BenchmarkParallel_ConnPool(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - conn := pool.Get() + conn, shared := pool.Get() do(b, conn) - pool.Put(conn) + if !shared { + pool.Put(conn) + } } }) } diff --git a/internal/net/tcp/dialer.go b/internal/net/tcp/dialer.go index 4fa56e2629..0a663cd681 100644 --- a/internal/net/tcp/dialer.go +++ b/internal/net/tcp/dialer.go @@ -29,8 +29,6 @@ import ( "github.com/vdaas/vald/internal/safety" ) -// type Dialer func(ctx context.Context, network, addr string) (net.Conn, error) - type Dialer interface { GetDialer() func(ctx context.Context, network, addr string) (net.Conn, error) StartDialerCache(ctx context.Context) diff --git a/pkg/gateway/vald/service/gateway.go b/pkg/gateway/vald/service/gateway.go index 8553dd38dd..9889adb258 100644 --- a/pkg/gateway/vald/service/gateway.go +++ b/pkg/gateway/vald/service/gateway.go @@ -20,7 +20,6 @@ package service import ( "context" "fmt" - "math" "net" "reflect" "sync" @@ -192,30 +191,50 @@ func (g *gateway) discover(ctx context.Context, ech chan<- error) (err error) { if err != nil { return nil, err } - for i := 0; i < (math.MaxInt32); i++ { - visited := false - for i, node := range nodes.GetNodes() { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - if node != nil && node.GetPods() != nil { - pods := node.GetPods().GetPods() - if i < len(pods) { - addrs = append(addrs, fmt.Sprintf("%s:%d", pods[i].GetIp(), g.agentPort)) - if !visited { - visited = true + var wg sync.WaitGroup + c := sync.NewCond(new(sync.Mutex)) + cctx, cancel := context.WithCancel(ctx) + pch := make(chan string, len(nodes.GetNodes())) + for _, n := range nodes.GetNodes() { + select { + case <-cctx.Done(): + return nil, cctx.Err() + default: + node := n + wg.Add(1) + g.eg.Go(safety.RecoverFunc(func() error { + defer wg.Done() + if node != nil && node.GetPods() != nil && node.GetPods().GetPods() != nil { + c.L.Lock() + c.Wait() + c.L.Unlock() + for _, pod := range node.GetPods().GetPods() { + select { + case <-cctx.Done(): + return nil + default: + pch <- fmt.Sprintf("%s:%d", pod.GetIp(), g.agentPort) } - break } } - } + return nil + })) } - if !visited { + } + g.eg.Go(safety.RecoverFunc(func() error { + wg.Wait() + cancel() + return nil + })) + c.Broadcast() + for { + select { + case <-cctx.Done(): return nil, nil + case addr := <-pch: + addrs = append(addrs, addr) } } - return nil, nil }) if err != nil { log.Warn("failed to discover agents from discoverer API, trying to discover from dns...")