Skip to content

Commit 7f7e41c

Browse files
author
Yusuke Kato
authored
[patch] bugfix memory leak (#251)
* [patch] bugfix memory leak Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * [patch] bugfix memory leak Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]> * fix Signed-off-by: kpango <[email protected]>
1 parent 6b283ea commit 7f7e41c

File tree

7 files changed

+87
-28
lines changed

7 files changed

+87
-28
lines changed

Makefile

+7
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ update: \
164164
license \
165165
update/goimports
166166

167+
168+
.PHONY: format
169+
## format go codes
170+
format: \
171+
license \
172+
update/goimports
173+
167174
.PHONY: update/goimports
168175
## run goimports for all go files
169176
update/goimports:

internal/client/discoverer/discover.go

+18-16
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
8484
}
8585
c.addrs.Store(addrs)
8686

87+
var aech <-chan error
8788
if c.autoconn {
8889
c.client = grpc.New(
8990
append(
@@ -92,6 +93,13 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
9293
grpc.WithErrGroup(c.eg),
9394
)...,
9495
)
96+
if c.client != nil {
97+
aech, err = c.client.StartConnectionMonitor(ctx)
98+
if err != nil {
99+
close(ech)
100+
return nil, err
101+
}
102+
}
95103
}
96104

97105
err = c.discover(ctx, ech)
@@ -100,15 +108,6 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
100108
return nil, errors.Wrap(c.dscClient.Close(), err.Error())
101109
}
102110

103-
var aech <-chan error
104-
if c.autoconn && c.client != nil {
105-
aech, err = c.client.StartConnectionMonitor(ctx)
106-
if err != nil {
107-
close(ech)
108-
return nil, err
109-
}
110-
}
111-
112111
c.eg.Go(safety.RecoverFunc(func() (err error) {
113112
defer close(ech)
114113
fch := make(chan struct{}, 1)
@@ -235,10 +234,10 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) {
235234
Node: c.nodeName,
236235
}, copts...)
237236
if err != nil {
237+
log.Warn("failed to call discoverer.Node API")
238238
return nil, errors.ErrRPCCallFailed(c.dscAddr, err)
239239
}
240240
var wg sync.WaitGroup
241-
cond := sync.NewCond(new(sync.Mutex))
242241
cctx, cancel := context.WithCancel(ctx)
243242
pch := make(chan string, len(nodes.GetNodes()))
244243
for _, n := range nodes.GetNodes() {
@@ -251,19 +250,20 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) {
251250
wg.Add(1)
252251
c.eg.Go(safety.RecoverFunc(func() (err error) {
253252
defer wg.Done()
254-
cond.L.Lock()
255-
cond.Wait()
256-
cond.L.Unlock()
253+
log.Debug("processing node name = %s", node.GetName())
257254
for _, pod := range node.GetPods().GetPods() {
258255
select {
259256
case <-cctx.Done():
257+
log.Debug("exit pods loop by context")
260258
return nil
261259
default:
260+
log.Debug("%#v", pod)
262261
if pod != nil && pod.GetIp() != "" {
262+
log.Debug("processing pod name = %s", pod.GetName())
263263
addr := fmt.Sprintf("%s:%d", pod.GetIp(), c.port)
264264
if err = c.connect(ctx, addr); err != nil {
265265
err = errors.ErrAddrCouldNotDiscover(err, addr)
266-
log.Debug(err)
266+
log.Warn(err)
267267
ech <- err
268268
err = nil
269269
} else {
@@ -275,6 +275,7 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) {
275275
}
276276
}
277277
}
278+
log.Debug("finished node = " + node.GetName())
278279
return nil
279280
}))
280281
}
@@ -285,23 +286,24 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) {
285286
cancel()
286287
return nil
287288
}))
288-
cond.Broadcast()
289289
for {
290290
select {
291291
case <-cctx.Done():
292292
if len(connected) == 0 {
293+
log.Warn("connected addr is zero")
293294
return nil, errors.ErrAddrCouldNotDiscover(err, c.dns)
294295
}
295296
if c.onDiscover != nil {
296297
return nil, c.onDiscover(ctx, c, connected)
297298
}
298299
return nil, nil
299300
case addr := <-pch:
301+
log.Debug("connected addr = " + addr)
300302
connected = append(connected, addr)
301303
}
302304
}
303305
}); err != nil {
304-
log.Warn("failed to discover addrs from discoverer API, trying to discover from dns...")
306+
log.Warn("failed to discover addrs from discoverer API, trying to discover from dns..., %v", err)
305307
connected, err = c.dnsDiscovery(ctx, ech)
306308
if err != nil {
307309
return err

internal/net/grpc/pool.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,17 @@ func NewPool(ctx context.Context, addr string, size uint64, dopts ...DialOption)
7575
if cp.conn != nil && isHealthy(cp.conn) {
7676
return cp.conn
7777
}
78+
log.Warn("establishing new connection to " + cp.addr)
7879
conn, err := grpc.DialContext(ctx, cp.addr, cp.dopts...)
7980
if err != nil {
8081
log.Error(err)
8182
return nil
8283
}
83-
return conn
84+
if cp.conn != nil {
85+
cp.conn.Close()
86+
}
87+
cp.conn = conn
88+
return cp.conn
8489
},
8590
}
8691
return cp.Connect(ctx)
@@ -117,6 +122,8 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e
117122
conn, err := grpc.DialContext(ctx, c.addr, c.dopts...)
118123
if err == nil {
119124
c.conn = conn
125+
} else {
126+
log.Debug(err)
120127
}
121128
}
122129

@@ -126,28 +133,28 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e
126133

127134
if c.host == localHost ||
128135
c.host == localIPv4 {
129-
for {
130-
if atomic.LoadUint64(&c.length) > c.size {
131-
return c, nil
132-
}
136+
for atomic.LoadUint64(&c.length) > c.size {
133137
conn, err := grpc.DialContext(ctx, localIPv4+":"+c.port, c.dopts...)
134138
if err == nil {
135139
c.Put(conn)
140+
} else {
141+
log.Debug(err)
136142
}
137143
}
144+
return c, nil
138145
}
139146

140147
ips, err := net.DefaultResolver.LookupIPAddr(ctx, c.host)
141148
if err != nil {
142-
for {
143-
if atomic.LoadUint64(&c.length) > c.size {
144-
return c, nil
145-
}
149+
for atomic.LoadUint64(&c.length) > c.size {
146150
conn, err := grpc.DialContext(ctx, c.addr, c.dopts...)
147151
if err == nil {
148152
c.Put(conn)
153+
} else {
154+
log.Debug(err)
149155
}
150156
}
157+
return c, nil
151158
}
152159

153160
if uint64(len(ips)) < c.size {
@@ -163,6 +170,8 @@ func (c *ClientConnPool) Connect(ctx context.Context) (cp *ClientConnPool, err e
163170
conn, err := grpc.DialContext(ctx, ip.String()+":"+c.port, c.dopts...)
164171
if err == nil {
165172
c.Put(conn)
173+
} else {
174+
log.Debug(err)
166175
}
167176
if atomic.LoadUint64(&c.length) > c.size {
168177
return c, nil

internal/safety/safety.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"runtime"
2222

2323
"github.com/vdaas/vald/internal/errors"
24+
"github.com/vdaas/vald/internal/info"
2425
"github.com/vdaas/vald/internal/log"
2526
)
2627

@@ -40,7 +41,9 @@ func RecoverFunc(fn func() error) func() error {
4041
default:
4142
err = errors.ErrPanicRecovered(err, x)
4243
}
43-
log.Error(err)
44+
if err != nil {
45+
log.Error(err, info.Get())
46+
}
4447
}
4548
}()
4649
err = fn()

pkg/discoverer/k8s/handler/grpc/handler.go

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/vdaas/vald/apis/grpc/discoverer"
2525
"github.com/vdaas/vald/apis/grpc/payload"
2626
"github.com/vdaas/vald/internal/info"
27+
"github.com/vdaas/vald/internal/log"
2728
"github.com/vdaas/vald/internal/net/grpc/status"
2829
"github.com/vdaas/vald/pkg/discoverer/k8s/service"
2930
)
@@ -44,6 +45,7 @@ func New(opts ...Option) discoverer.DiscovererServer {
4445
func (s *server) Pods(ctx context.Context, req *payload.Discoverer_Request) (*payload.Info_Pods, error) {
4546
pods, err := s.dsc.GetPods(req)
4647
if err != nil {
48+
log.Error(err)
4749
return nil, status.WrapWithNotFound(fmt.Sprintf("Pods API request %#v pods not found", req), err, info.Get())
4850
}
4951
return pods, nil
@@ -52,6 +54,7 @@ func (s *server) Pods(ctx context.Context, req *payload.Discoverer_Request) (*pa
5254
func (s *server) Nodes(ctx context.Context, req *payload.Discoverer_Request) (*payload.Info_Nodes, error) {
5355
nodes, err := s.dsc.GetNodes(req)
5456
if err != nil {
57+
log.Error(err)
5558
return nil, status.WrapWithNotFound(fmt.Sprintf("Nodes API request %#v nodes not found", req), err, info.Get())
5659
}
5760
return nodes, nil

pkg/discoverer/k8s/service/discover.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ func New(opts ...Option) (dsc Discoverer, err error) {
8585
for name, metrics := range nodes {
8686
d.nodeMetrics.Store(name, metrics)
8787
}
88+
d.nodeMetrics.Range(func(name string, _ mnode.Node) bool {
89+
_, ok := nodes[name]
90+
if !ok {
91+
d.nodeMetrics.Delete(name)
92+
}
93+
return true
94+
})
8895
}),
8996
)),
9097
k8s.WithResourceController(mpod.New(
@@ -97,6 +104,13 @@ func New(opts ...Option) (dsc Discoverer, err error) {
97104
for name, pods := range podList {
98105
d.podMetrics.Store(name, pods)
99106
}
107+
d.podMetrics.Range(func(name string, _ mpod.Pod) bool {
108+
_, ok := podList[name]
109+
if !ok {
110+
d.podMetrics.Delete(name)
111+
}
112+
return true
113+
})
100114
}),
101115
)),
102116
k8s.WithResourceController(pod.New(
@@ -112,6 +126,13 @@ func New(opts ...Option) (dsc Discoverer, err error) {
112126
}
113127
d.pods.Store(name, pods)
114128
}
129+
d.pods.Range(func(name string, _ []pod.Pod) bool {
130+
_, ok := podList[name]
131+
if !ok {
132+
d.pods.Delete(name)
133+
}
134+
return true
135+
})
115136
}),
116137
)),
117138
k8s.WithResourceController(node.New(
@@ -121,9 +142,18 @@ func New(opts ...Option) (dsc Discoverer, err error) {
121142
}),
122143
node.WithOnReconcileFunc(func(nodes []node.Node) {
123144
log.Debugf("node resource reconciled\t%#v", nodes)
145+
nm := make(map[string]struct{}, len(nodes))
124146
for _, n := range nodes {
147+
nm[n.Name] = struct{}{}
125148
d.nodes.Store(n.Name, n)
126149
}
150+
d.nodes.Range(func(name string, _ node.Node) bool {
151+
_, ok := nm[name]
152+
if !ok {
153+
d.nodes.Delete(name)
154+
}
155+
return true
156+
})
127157
}),
128158
)),
129159
)
@@ -373,7 +403,9 @@ func (d *discoverer) GetPods(req *payload.Discoverer_Request) (pods *payload.Inf
373403
}
374404
}
375405
for i := range pods.GetPods() {
376-
pods.Pods[i].Node.Pods = nil
406+
if pods.Pods[i].Node != nil {
407+
pods.Pods[i].Node.Pods = nil
408+
}
377409
}
378410
return pods, nil
379411
}

pkg/discoverer/k8s/service/option.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Option func(d *discoverer) error
2828

2929
var (
3030
defaultOpts = []Option{
31-
WithDiscoverDuration("500ms"),
31+
WithDiscoverDuration("2s"),
3232
WithErrGroup(errgroup.Get()),
3333
}
3434
)
@@ -53,6 +53,9 @@ func WithNamespace(ns string) Option {
5353

5454
func WithDiscoverDuration(dur string) Option {
5555
return func(d *discoverer) error {
56+
if dur == "" {
57+
return nil
58+
}
5659
pd, err := timeutil.Parse(dur)
5760
if err != nil {
5861
pd = time.Second

0 commit comments

Comments
 (0)