Skip to content

Commit

Permalink
[patch] bugfix gateway high memory usage (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yusuke Kato authored Feb 23, 2020
1 parent 9a733ef commit 46710e5
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 119 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ go 1.13
replace (
github.com/boltdb/bolt => github.com/boltdb/bolt v1.3.1
github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852
github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.1
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.8.0
github.com/gorilla/mux => github.com/gorilla/mux v1.7.4
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
k8s.io/api => k8s.io/api v0.17.3
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3
k8s.io/apimachinery => k8s.io/apimachinery v0.17.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRf
github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a h1:S0iuKUl+IaDNndrUoJYN1qCCsK1K1/XDaK9qlNXo7Dw=
github.com/go-sql-driver/mysql v1.5.1-0.20200218151620-3d8a0293423a/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5 h1:ZZVxQRCm4ewuoqqLBJ7LHpsk4OGx2PkyCsRKLq4oHgE=
github.com/gocql/gocql v0.0.0-20200203083758-81b8263d9fe5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852 h1:2j3H2rpbxLvlA1xzFu8ijFzeLc7JPTL4rbnLiJ95KRQ=
github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocraft/dbr/v2 v2.7.0 h1:x+UnhSBYPFBBdtikLSMLQ9KPuquSUj4yBijsQAhhNZo=
github.com/gocraft/dbr/v2 v2.7.0/go.mod h1:wQdbxPBSloo2OlSedMxfNW0mgk0GXys9O1VFmQiwcx4=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
Expand Down Expand Up @@ -473,8 +473,8 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9E
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
159 changes: 93 additions & 66 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
}
return true
})

for _, addr := range reconnList {
if g.bo != nil {
_, err = g.bo.Do(ctx, func() (interface{}, error) {
Expand All @@ -162,19 +161,25 @@ func (g *gRPCClient) Range(ctx context.Context,
case <-ctx.Done():
return false
default:
if err := pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
if g.bo != nil {
_, err = g.bo.Do(ctx, func() (r interface{}, err error) {
err = f(ctx, addr, conn, g.copts...)
return
var err error
if g.bo != nil {
_, err = g.bo.Do(ctx, func() (r interface{}, err error) {
return nil, pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(ctx, addr, conn, g.copts...)
})
return err
}
return f(ctx, addr, conn, g.copts...)
}); err != nil {
})
} else {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(ctx, addr, conn, g.copts...)
})
}
if err != nil {
rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error())
}
}
Expand All @@ -194,19 +199,24 @@ func (g *gRPCClient) RangeConcurrent(ctx context.Context,
case <-egctx.Done():
return nil
default:
if err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
if g.bo != nil {
_, err = g.bo.Do(egctx, func() (r interface{}, err error) {
err = f(egctx, addr, conn, g.copts...)
return
if g.bo != nil {
_, err = g.bo.Do(egctx, func() (r interface{}, err error) {
return nil, pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(egctx, addr, conn, g.copts...)
})
return err
}
return f(egctx, addr, conn, g.copts...)
}); err != nil {
})
} else {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(egctx, addr, conn, g.copts...)
})
}
if err != nil {
return errors.ErrRPCCallFailed(addr, err)
}
return nil
Expand All @@ -228,19 +238,24 @@ func (g *gRPCClient) OrderedRange(ctx context.Context,
default:
pool, ok := g.conns.Load(addr)
if ok {
if err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
if g.bo != nil {
_, err = g.bo.Do(ctx, func() (r interface{}, err error) {
err = f(ctx, addr, conn, g.copts...)
return
if g.bo != nil {
_, err = g.bo.Do(ctx, func() (r interface{}, err error) {
return nil, pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(ctx, addr, conn, g.copts...)
})
return err
}
return f(ctx, addr, conn, g.copts...)
}); err != nil {
})
} else {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(ctx, addr, conn, g.copts...)
})
}
if err != nil {
rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error())
}
}
Expand All @@ -262,23 +277,28 @@ func (g *gRPCClient) OrderedRangeConcurrent(ctx context.Context,
}
eg.Go(safety.RecoverFunc(func() (err error) {
select {
case <-ctx.Done():
case <-egctx.Done():
return nil
default:
if err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
if g.bo != nil {
_, err = g.bo.Do(egctx, func() (r interface{}, err error) {
err = f(egctx, addr, conn, g.copts...)
return
if g.bo != nil {
_, err = g.bo.Do(egctx, func() (r interface{}, err error) {
return nil, pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(egctx, addr, conn, g.copts...)
})
return nil
}
return f(egctx, addr, conn, g.copts...)
}); err != nil {
rerr = errors.Wrap(rerr, errors.ErrRPCCallFailed(addr, err).Error())
})
} else {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return f(egctx, addr, conn, g.copts...)
})
}
if err != nil {
return errors.ErrRPCCallFailed(addr, err)
}
return nil
}
Expand All @@ -294,23 +314,30 @@ func (g *gRPCClient) Do(ctx context.Context, addr string,
if !ok {
return nil, errors.ErrGRPCClientConnNotFound(addr)
}
if err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
if g.bo != nil {
data, err = g.bo.Do(ctx, func() (r interface{}, err error) {
r, err = f(ctx, conn, g.copts...)
if err != nil {
return nil, err
if g.bo != nil {
data, err = g.bo.Do(ctx, func() (r interface{}, err error) {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
return r, nil
r, err = f(ctx, conn, g.copts...)
return err
})
} else {
if err != nil {
return nil, err
}
return r, err
})
} else {
err = pool.Do(func(conn *ClientConn) (err error) {
if conn == nil {
return errors.ErrGRPCClientConnNotFound(addr)
}
data, err = f(ctx, conn, g.copts...)
}
return err
}); err != nil {
return err
})
}
if err != nil {
return nil, errors.ErrRPCCallFailed(addr, err)
}
return
Expand All @@ -330,7 +357,7 @@ func (g *gRPCClient) Connect(ctx context.Context, addr string, dopts ...DialOpti
if pool.IsHealthy() {
return nil
}
pool, err := pool.Connect(ctx)
pool, err = pool.Connect(ctx)
if err != nil {
return err
}
Expand Down
58 changes: 35 additions & 23 deletions internal/net/grpc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,20 @@ func (c *ClientConnPool) Disconnect() (rerr error) {
if err != nil {
rerr = errors.Wrap(rerr, err.Error())
}
for {
conn := c.Get()
if conn == nil {
return
}
err = conn.Close()
if err != nil {
rerr = errors.Wrap(rerr, err.Error())
for i := uint64(0); i < atomic.LoadUint64(&c.length); i++ {
conn, _ := c.Get()
if conn != nil {
err = conn.Close()
if err != nil {
rerr = errors.Wrap(rerr, err.Error())
}
}
}
return
}

func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err error) {
if c.closing.Load().(bool) ||
atomic.LoadUint64(&c.length) > c.size {
if c.closing.Load().(bool) {
return c, nil
}

Expand All @@ -121,6 +120,10 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e
}
}

if atomic.LoadUint64(&c.length) > c.size {
return c, nil
}

if c.host == localHost ||
c.host == localIPv4 {
for {
Expand Down Expand Up @@ -168,47 +171,56 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e
return c, nil
}

func (c *ClientConnPool) Get() *ClientConn {
func (c *ClientConnPool) Get() (*ClientConn, bool) {
conn, ok := c.pool.Get().(*ClientConn)
if !ok {
return c.conn
return c.conn, true
}
atomic.AddUint64(&c.length, ^uint64(0))
if conn == nil || !isHealthy(conn) {
if c.closing.Load().(bool) {
return nil
return nil, false
}
return c.conn
return c.conn, true
}
return conn

if conn == c.conn {
return conn, true
}

return conn, false
}

func (c *ClientConnPool) Put(conn *ClientConn) error {
if conn != nil {
if c.closing.Load().(bool) {
return nil
}
if atomic.LoadUint64(&c.length) > c.size {
if c.closing.Load().(bool) || atomic.LoadUint64(&c.length) > c.size {
return conn.Close()
}
if conn == c.conn {
return nil
}
atomic.AddUint64(&c.length, 1)
c.pool.Put(conn)
}
return nil
}

func (c *ClientConnPool) Do(f func(conn *ClientConn) error) (err error) {
conn := c.Get()
conn, shared := c.Get()
err = f(conn)
c.Put(conn)
if !shared {
c.Put(conn)
}
return err
}

func (c *ClientConnPool) IsHealthy() bool {
for i := uint64(0); i < c.size; i++ {
conn := c.Get()
conn, shared := c.Get()
if conn != nil && isHealthy(conn) {
c.Put(conn)
if !shared {
c.Put(conn)
}
} else {
if conn != nil {
conn.Close()
Expand Down
12 changes: 8 additions & 4 deletions internal/net/grpc/pool_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ func Benchmark_ConnPool(b *testing.B) {
}

for i := 0; i < b.N; i++ {
conn := pool.Get()
conn, shared := pool.Get()
do(b, conn)
pool.Put(conn)
if !shared {
pool.Put(conn)
}
}
}

Expand Down Expand Up @@ -124,9 +126,11 @@ func BenchmarkParallel_ConnPool(b *testing.B) {

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
conn := pool.Get()
conn, shared := pool.Get()
do(b, conn)
pool.Put(conn)
if !shared {
pool.Put(conn)
}
}
})
}
Expand Down
Loading

0 comments on commit 46710e5

Please sign in to comment.