Skip to content

Commit c421f20

Browse files
committed
close dropped connection on balancer.Update()
1 parent 3074ad5 commit c421f20

File tree

4 files changed

+38
-3
lines changed

4 files changed

+38
-3
lines changed

internal/balancer/balancer.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
127127
)
128128
previous = b.connections().All()
129129
)
130+
131+
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
132+
return strings.Compare(lhs.Address(), rhs.Address())
133+
})
134+
130135
defer func() {
131-
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
132-
return strings.Compare(lhs.Address(), rhs.Address())
133-
})
134136
onDone(
135137
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
136138
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
@@ -145,6 +147,14 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
145147
c.Endpoint().Touch()
146148
}
147149

150+
for _, e := range dropped {
151+
c := b.pool.GetIfPresent(e)
152+
if c != nil {
153+
b.pool.Ban(ctx, c, errEndpointNotDiscovered)
154+
_ = c.Close(ctx)
155+
}
156+
}
157+
148158
info := balancerConfig.Info{SelfLocation: localDC}
149159
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
150160

internal/balancer/errors.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package balancer
2+
3+
import (
4+
"errors"
5+
)
6+
7+
var (
8+
errEndpointNotDiscovered = errors.New("endpoint is no longer discovered")
9+
)

internal/conn/conn.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package conn
33
import (
44
"context"
55
"fmt"
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -36,6 +37,7 @@ var (
3637

3738
type Conn interface {
3839
grpc.ClientConnInterface
40+
closer.Closer
3941

4042
Endpoint() endpoint.Endpoint
4143

internal/conn/pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
4040
return p.dialOptions
4141
}
4242

43+
func (p *Pool) GetIfPresent(endpoint endpoint.Endpoint) Conn {
44+
var (
45+
address = endpoint.Address()
46+
cc *conn
47+
has bool
48+
)
49+
50+
if cc, has = p.conns.Get(address); has {
51+
return cc
52+
}
53+
54+
return nil
55+
}
56+
4357
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
4458
var (
4559
address = endpoint.Address()

0 commit comments

Comments
 (0)