Skip to content

Commit 3927c7d

Browse files
committed
refactor: register logic for route registry
Resolves: #468
1 parent 5ee987a commit 3927c7d

File tree

4 files changed

+75
-45
lines changed

4 files changed

+75
-45
lines changed

src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -418,11 +418,11 @@ var _ = Describe("ProxyRoundTripper", func() {
418418
if transport.RoundTripCallCount() == 1 {
419419
endpoint := endpointFor(4)
420420
updated := routePool.Put(endpoint)
421-
Expect(updated).To(Equal(route.UPDATED))
421+
Expect(updated).To(Equal(route.REFRESHED))
422422

423423
endpoint = endpointFor(5)
424424
updated = routePool.Put(endpoint)
425-
Expect(updated).To(Equal(route.UPDATED))
425+
Expect(updated).To(Equal(route.REFRESHED))
426426
}
427427

428428
return nil, &net.OpError{Op: "dial", Err: errors.New("connection refused")}

src/code.cloudfoundry.org/gorouter/registry/registry.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -88,29 +88,32 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) {
8888
return
8989
}
9090

91-
endpointAdded := r.register(uri, endpoint)
91+
poolPutResult := r.register(uri, endpoint)
9292

93-
r.reporter.CaptureRegistryMessage(endpoint, endpointAdded.String())
93+
r.reporter.CaptureRegistryMessage(endpoint, poolPutResult.String())
9494

95-
if endpointAdded == route.ADDED && !endpoint.UpdatedAt.IsZero() {
95+
if poolPutResult == route.ADDED && !endpoint.UpdatedAt.IsZero() {
9696
r.reporter.CaptureRouteRegistrationLatency(time.Since(endpoint.UpdatedAt))
9797
}
9898

99-
switch endpointAdded {
99+
switch poolPutResult {
100100
case route.ADDED:
101101
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
102102
r.logger.Info("endpoint-registered", buildSlogAttrs(uri, endpoint)...)
103103
}
104104
case route.UPDATED:
105-
if r.logger.Enabled(context.Background(), slog.LevelDebug) {
106-
r.logger.Debug("endpoint-registered", buildSlogAttrs(uri, endpoint)...)
105+
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
106+
r.logger.Info("endpoint-registered", buildSlogAttrs(uri, endpoint)...)
107107
}
108-
default:
108+
case route.UNMODIFIED:
109109
if r.logger.Enabled(context.Background(), slog.LevelDebug) {
110110
r.logger.Debug("endpoint-not-registered", buildSlogAttrs(uri, endpoint)...)
111111
}
112+
case route.REFRESHED:
113+
if r.logger.Enabled(context.Background(), slog.LevelDebug) {
114+
r.logger.Debug("endpoint-refreshed", buildSlogAttrs(uri, endpoint)...)
115+
}
112116
}
113-
114117
}
115118

116119
func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.PoolPutResult {
@@ -132,11 +135,11 @@ func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.
132135
endpoint.StaleThreshold = r.dropletStaleThreshold
133136
}
134137

135-
endpointAdded := pool.Put(endpoint)
138+
poolPutResult := pool.Put(endpoint)
136139
// Overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid.
137140
r.SetTimeOfLastUpdate(t)
138141

139-
return endpointAdded
142+
return poolPutResult
140143
}
141144

142145
// insertRouteKey acquires a write lock, inserts the route key into the registry and releases the write lock.

src/code.cloudfoundry.org/gorouter/route/pool.go

+53-26
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ func (p PoolPutResult) String() string {
3131
return "updated"
3232
case ADDED:
3333
return "added"
34+
case REFRESHED:
35+
return "refreshed"
3436
default:
3537
panic("invalid PoolPutResult")
3638
}
@@ -40,6 +42,7 @@ const (
4042
UNMODIFIED = PoolPutResult(iota)
4143
UPDATED
4244
ADDED
45+
REFRESHED
4346
)
4447

4548
func NewCounter(initial int64) *Counter {
@@ -277,35 +280,58 @@ func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult {
277280
p.Lock()
278281
defer p.Unlock()
279282

280-
var result PoolPutResult
283+
var equal bool
281284
e, found := p.index[endpoint.CanonicalAddr()]
282285
if found {
283-
result = UPDATED
284-
if !e.endpoint.Equal(endpoint) {
285-
e.Lock()
286-
defer e.Unlock()
286+
// Only calculate equal once, it's expensive.
287+
equal = e.endpoint.Equal(endpoint)
288+
}
287289

288-
if !e.endpoint.ModificationTag.SucceededBy(&endpoint.ModificationTag) {
289-
return UNMODIFIED
290-
}
290+
switch {
291+
case found && equal:
292+
// This is the most common case. The endpoint has not changed but was simply re-announced
293+
// to ensure gorouter is still aware of it.
294+
e.updated = time.Now()
295+
p.Update()
291296

292-
oldEndpoint := e.endpoint
293-
e.endpoint = endpoint
297+
return REFRESHED
294298

295-
if oldEndpoint.PrivateInstanceId != endpoint.PrivateInstanceId {
296-
delete(p.index, oldEndpoint.PrivateInstanceId)
297-
p.index[endpoint.PrivateInstanceId] = e
298-
}
299+
case found && !e.endpoint.ModificationTag.SucceededBy(&endpoint.ModificationTag):
300+
// This exists to protect against flapping when a route receives a change (e.g. a new
301+
// route-service URL) and messages for the old and new config are still floating around.
302+
return UNMODIFIED
299303

300-
if oldEndpoint.ServerCertDomainSAN == endpoint.ServerCertDomainSAN {
301-
endpoint.SetRoundTripper(oldEndpoint.RoundTripper())
302-
}
304+
case found && !equal:
305+
// The same endpoint was announced with different data, replace the old endpoint with the
306+
// new one.
307+
e.Lock()
308+
defer e.Unlock()
309+
310+
oldEndpoint := e.endpoint
311+
e.endpoint = endpoint
312+
313+
if oldEndpoint.PrivateInstanceId != endpoint.PrivateInstanceId {
314+
delete(p.index, oldEndpoint.PrivateInstanceId)
315+
p.index[endpoint.PrivateInstanceId] = e
316+
}
317+
318+
if oldEndpoint.ServerCertDomainSAN == endpoint.ServerCertDomainSAN {
319+
endpoint.SetRoundTripper(oldEndpoint.RoundTripper())
303320
}
304-
} else {
305-
result = ADDED
321+
322+
p.RouteSvcUrl = e.endpoint.RouteServiceUrl
323+
p.setPoolLoadBalancingAlgorithm(e.endpoint)
324+
e.updated = time.Now()
325+
p.Update()
326+
327+
return UPDATED
328+
329+
case !found:
330+
// New endpoint.
306331
e = &endpointElem{
307332
endpoint: endpoint,
308333
index: len(p.endpoints),
334+
updated: time.Now(),
309335
maxConnsPerBackend: p.maxConnsPerBackend,
310336
}
311337

@@ -314,14 +340,15 @@ func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult {
314340
p.index[endpoint.CanonicalAddr()] = e
315341
p.index[endpoint.PrivateInstanceId] = e
316342

317-
}
318-
p.RouteSvcUrl = e.endpoint.RouteServiceUrl
319-
p.setPoolLoadBalancingAlgorithm(e.endpoint)
320-
e.updated = time.Now()
321-
// set the update time of the pool
322-
p.Update()
343+
p.RouteSvcUrl = e.endpoint.RouteServiceUrl
344+
p.setPoolLoadBalancingAlgorithm(e.endpoint)
345+
p.Update()
346+
347+
return ADDED
323348

324-
return result
349+
default:
350+
panic("quantum state discovered")
351+
}
325352
}
326353

327354
func (p *EndpointPool) RouteServiceUrl() string {

src/code.cloudfoundry.org/gorouter/route/pool_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ var _ = Describe("EndpointPool", func() {
152152
pool.MarkUpdated(time.Now().Add(-(10 * time.Minute)))
153153

154154
b := pool.Put(endpoint)
155-
Expect(b).To(Equal(route.UPDATED))
155+
Expect(b).To(Equal(route.REFRESHED))
156156

157157
prunedEndpoints := pool.PruneEndpoints()
158158
Expect(prunedEndpoints).To(BeEmpty())
@@ -163,7 +163,7 @@ var _ = Describe("EndpointPool", func() {
163163
endpoint2 := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678})
164164

165165
pool.Put(endpoint1)
166-
Expect(pool.Put(endpoint2)).To(Equal(route.UPDATED))
166+
Expect(pool.Put(endpoint2)).To(Equal(route.REFRESHED))
167167
})
168168

169169
Context("with modification tags", func() {
@@ -189,7 +189,7 @@ var _ = Describe("EndpointPool", func() {
189189
BeforeEach(func() {
190190
modTag2.Increment()
191191
endpoint := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, ModificationTag: modTag2})
192-
pool.Put(endpoint)
192+
Expect(pool.Put(endpoint)).To(Equal(route.UPDATED))
193193
})
194194

195195
It("doesnt update an endpoint", func() {
@@ -387,7 +387,9 @@ var _ = Describe("EndpointPool", func() {
387387
Context("when any endpoint updates its route_service_url", func() {
388388
It("returns the route_service_url most recently updated in the pool", func() {
389389
endpointRS1 := route.NewEndpoint(&route.EndpointOpts{Host: "host-1", Port: 1234, RouteServiceUrl: "first-url"})
390+
endpointRS1Updated := route.NewEndpoint(&route.EndpointOpts{Host: "host-1", Port: 1234, RouteServiceUrl: "third-url"})
390391
endpointRS2 := route.NewEndpoint(&route.EndpointOpts{Host: "host-2", Port: 2234, RouteServiceUrl: "second-url"})
392+
endpointRS2Updated := route.NewEndpoint(&route.EndpointOpts{Host: "host-2", Port: 2234, RouteServiceUrl: "fourth-url"})
391393
b := pool.Put(endpointRS1)
392394
Expect(b).To(Equal(route.ADDED))
393395

@@ -399,14 +401,12 @@ var _ = Describe("EndpointPool", func() {
399401
url = pool.RouteServiceUrl()
400402
Expect(url).To(Equal("second-url"))
401403

402-
endpointRS1.RouteServiceUrl = "third-url"
403-
b = pool.Put(endpointRS1)
404+
b = pool.Put(endpointRS1Updated)
404405
Expect(b).To(Equal(route.UPDATED))
405406
url = pool.RouteServiceUrl()
406407
Expect(url).To(Equal("third-url"))
407408

408-
endpointRS2.RouteServiceUrl = "fourth-url"
409-
b = pool.Put(endpointRS2)
409+
b = pool.Put(endpointRS2Updated)
410410
Expect(b).To(Equal(route.UPDATED))
411411
url = pool.RouteServiceUrl()
412412
Expect(url).To(Equal("fourth-url"))

0 commit comments

Comments
 (0)