Skip to content

Commit 50a317f

Browse files
authored
Merge pull request #486 from jkh52/frontend-name
proxy-server: pure rename (frontends vs. established).
2 parents 9911f55 + bfc2d19 commit 50a317f

File tree

2 files changed

+80
-81
lines changed

2 files changed

+80
-81
lines changed

pkg/server/server.go

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,10 @@ type ProxyServer struct {
199199
// ready but there is no healthy connection.
200200
Readiness ReadinessManager
201201

202-
// fmu protects frontends.
202+
// fmu protects established.
203203
fmu sync.RWMutex
204204
// conn = Frontend[agentID][connID]
205-
frontends map[string]map[int64]*ProxyClientConnection
205+
established map[string]map[int64]*ProxyClientConnection
206206

207207
PendingDial *PendingDialManager
208208

@@ -333,102 +333,102 @@ func (s *ProxyServer) removeBackend(agentID string, conn agent.AgentService_Conn
333333
}
334334
}
335335

336-
func (s *ProxyServer) addFrontend(agentID string, connID int64, p *ProxyClientConnection) {
336+
func (s *ProxyServer) addEstablished(agentID string, connID int64, p *ProxyClientConnection) {
337337
s.fmu.Lock()
338338
defer s.fmu.Unlock()
339-
if _, ok := s.frontends[agentID]; !ok {
340-
s.frontends[agentID] = make(map[int64]*ProxyClientConnection)
339+
if _, ok := s.established[agentID]; !ok {
340+
s.established[agentID] = make(map[int64]*ProxyClientConnection)
341341
}
342-
s.frontends[agentID][connID] = p
342+
s.established[agentID][connID] = p
343343

344-
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.frontends))
344+
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.established))
345345
}
346346

347-
func (s *ProxyServer) removeFrontend(agentID string, connID int64) *ProxyClientConnection {
347+
func (s *ProxyServer) removeEstablished(agentID string, connID int64) *ProxyClientConnection {
348348
var ret *ProxyClientConnection
349349
s.fmu.Lock()
350350
defer s.fmu.Unlock()
351-
conns, ok := s.frontends[agentID]
351+
conns, ok := s.established[agentID]
352352
if !ok {
353353
return nil
354354
}
355355
if ret, ok = conns[connID]; !ok {
356356
return nil
357357
}
358-
delete(s.frontends[agentID], connID)
359-
if len(s.frontends[agentID]) == 0 {
360-
delete(s.frontends, agentID)
358+
delete(s.established[agentID], connID)
359+
if len(s.established[agentID]) == 0 {
360+
delete(s.established, agentID)
361361
}
362-
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.frontends))
362+
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.established))
363363
return ret
364364
}
365365

366366
func (s *ProxyServer) getFrontend(agentID string, connID int64) (*ProxyClientConnection, error) {
367367
s.fmu.RLock()
368368
defer s.fmu.RUnlock()
369-
conns, ok := s.frontends[agentID]
369+
conns, ok := s.established[agentID]
370370
if !ok {
371-
return nil, fmt.Errorf("can't find agentID %s in the frontends", agentID)
371+
return nil, fmt.Errorf("can't find agentID %s in the established", agentID)
372372
}
373373
conn, ok := conns[connID]
374374
if !ok {
375-
return nil, fmt.Errorf("can't find connID %d in the frontends[%s]", connID, agentID)
375+
return nil, fmt.Errorf("can't find connID %d in the established[%s]", connID, agentID)
376376
}
377377
return conn, nil
378378
}
379379

380-
func (s *ProxyServer) removeFrontendsForBackendConn(agentID string, backend Backend) ([]*ProxyClientConnection, error) {
380+
func (s *ProxyServer) removeEstablishedForBackendConn(agentID string, backend Backend) ([]*ProxyClientConnection, error) {
381381
var ret []*ProxyClientConnection
382382
if backend == nil {
383383
return ret, nil
384384
}
385385
s.fmu.Lock()
386386
defer s.fmu.Unlock()
387-
frontends, ok := s.frontends[agentID]
387+
established, ok := s.established[agentID]
388388
if !ok {
389-
return nil, fmt.Errorf("can't find agentID %s in the frontends", agentID)
389+
return nil, fmt.Errorf("can't find agentID %s in the established", agentID)
390390
}
391-
for _, frontend := range frontends {
391+
for _, frontend := range established {
392392
if frontend.backend == backend {
393-
delete(s.frontends, agentID)
393+
delete(s.established, agentID)
394394
ret = append(ret, frontend)
395395
}
396396
}
397397

398-
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.frontends))
398+
metrics.Metrics.SetEstablishedConnCount(s.getCount(s.established))
399399
return ret, nil
400400
}
401401

402-
func (s *ProxyServer) getCount(frontends map[string](map[int64]*ProxyClientConnection)) int {
402+
func (s *ProxyServer) getCount(established map[string](map[int64]*ProxyClientConnection)) int {
403403
count := 0
404-
for _, frontend := range frontends {
404+
for _, frontend := range established {
405405
count = count + len(frontend)
406406
}
407407
return count
408408
}
409409

410-
// removeForStream removes and returns all established ProxyClientConnection associated with a given
410+
// removeEstablishedForStream removes and returns all established ProxyClientConnection associated with a given
411411
// Proxy gRPC connection (expected to be at most 1 while konnectivity-client API gives single-use
412412
// tunnels).
413-
func (s *ProxyServer) removeFrontendsForStream(streamUID string) []*ProxyClientConnection {
413+
func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClientConnection {
414414
var ret []*ProxyClientConnection
415415
if streamUID == "" {
416416
return ret
417417
}
418418
s.fmu.Lock()
419419
defer s.fmu.Unlock()
420-
for agentID, frontends := range s.frontends {
421-
for connID, frontend := range frontends {
420+
for agentID, established := range s.established {
421+
for connID, frontend := range established {
422422
if frontend.frontend == nil {
423423
continue
424424
}
425425
if frontend.frontend.streamUID == streamUID {
426-
delete(frontends, connID)
426+
delete(established, connID)
427427
ret = append(ret, frontend)
428428
}
429429
}
430-
if len(frontends) == 0 {
431-
delete(s.frontends, agentID)
430+
if len(established) == 0 {
431+
delete(s.established, agentID)
432432
}
433433
}
434434
return ret
@@ -451,7 +451,7 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun
451451
}
452452

453453
return &ProxyServer{
454-
frontends: make(map[string](map[int64]*ProxyClientConnection)),
454+
established: make(map[string](map[int64]*ProxyClientConnection)),
455455
PendingDial: NewPendingDialManager(),
456456
serverID: serverID,
457457
serverCount: serverCount,
@@ -493,7 +493,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
493493
// TODO: add agent support to handle this
494494
s.sendBackendDialClose(p.backend, p.dialID, "frontend stream shutdown")
495495
}
496-
for _, f := range s.removeFrontendsForStream(streamUID) {
496+
for _, f := range s.removeEstablishedForStream(streamUID) {
497497
klog.V(2).InfoS("frontend stream shutdown, cleaning frontend", "connectionID", f.connectID, "dialID", f.dialID)
498498
s.sendBackendClose(f.backend, f.connectID, f.dialID, "frontend stream shutdown")
499499
}
@@ -891,19 +891,18 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, agentID string, recvCh <
891891
}()
892892

893893
defer func() {
894-
// Close all connected frontends when the agent connection is closed
895-
// TODO(#126): Frontends in PendingDial state that have not been added to the
896-
// list of frontends should also be closed.
897-
frontends, err := s.removeFrontendsForBackendConn(agentID, backend)
894+
// Close all established connections when the agent connection is closed
895+
// TODO(#126): connections in PendingDial state should also be closed.
896+
established, err := s.removeEstablishedForBackendConn(agentID, backend)
898897
if err != nil {
899898
return
900899
}
901-
if len(frontends) > 0 {
902-
klog.V(2).InfoS("Close frontends connected to agent",
903-
"count", len(frontends), "agentID", agentID)
900+
if len(established) > 0 {
901+
klog.V(2).InfoS("Close established connections to agent",
902+
"count", len(established), "agentID", agentID)
904903
}
905904

906-
for _, frontend := range frontends {
905+
for _, frontend := range established {
907906
pkt := &client.Packet{
908907
Type: client.PacketType_CLOSE_RSP,
909908
Payload: &client.Packet_CloseResponse{
@@ -958,7 +957,7 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, agentID string, recvCh <
958957
frontend.connectID = resp.ConnectID
959958
frontend.agentID = agentID
960959
// TODO: this connection may be cleaned on serveRecvFrontend exit, make it independent.
961-
s.addFrontend(agentID, resp.ConnectID, frontend)
960+
s.addEstablished(agentID, resp.ConnectID, frontend)
962961
close(frontend.connected)
963962
metrics.Metrics.ObserveDialLatency(time.Since(frontend.start))
964963
klog.V(3).InfoS("Proxy connection established",
@@ -1014,7 +1013,7 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, agentID string, recvCh <
10141013
case client.PacketType_CLOSE_RSP:
10151014
resp := pkt.GetCloseResponse()
10161015
klog.V(5).InfoS("Received CLOSE_RSP", "agentID", agentID, "connectionID", resp.ConnectID)
1017-
frontend := s.removeFrontend(agentID, resp.ConnectID)
1016+
frontend := s.removeEstablished(agentID, resp.ConnectID)
10181017
if frontend == nil {
10191018
// assuming it is already closed, just log it
10201019
klog.V(2).InfoS("could not get frontend client for closing", "agentID", agentID, "connectionID", resp.ConnectID)

pkg/server/server_test.go

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -224,22 +224,22 @@ func TestAddRemoveFrontends(t *testing.T) {
224224
agent3ConnID1 := new(ProxyClientConnection)
225225

226226
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
227-
p.addFrontend("agent1", int64(1), agent1ConnID1)
228-
p.removeFrontend("agent1", int64(1))
227+
p.addEstablished("agent1", int64(1), agent1ConnID1)
228+
p.removeEstablished("agent1", int64(1))
229229
expectedFrontends := make(map[string]map[int64]*ProxyClientConnection)
230-
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
230+
if e, a := expectedFrontends, p.established; !reflect.DeepEqual(e, a) {
231231
t.Errorf("expected %v, got %v", e, a)
232232
}
233233

234234
p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
235-
p.addFrontend("agent1", int64(1), agent1ConnID1)
236-
p.addFrontend("agent1", int64(2), agent1ConnID2)
237-
p.addFrontend("agent2", int64(1), agent2ConnID1)
238-
p.addFrontend("agent2", int64(2), agent2ConnID2)
239-
p.addFrontend("agent3", int64(1), agent3ConnID1)
240-
p.removeFrontend("agent2", int64(1))
241-
p.removeFrontend("agent2", int64(2))
242-
p.removeFrontend("agent1", int64(1))
235+
p.addEstablished("agent1", int64(1), agent1ConnID1)
236+
p.addEstablished("agent1", int64(2), agent1ConnID2)
237+
p.addEstablished("agent2", int64(1), agent2ConnID1)
238+
p.addEstablished("agent2", int64(2), agent2ConnID2)
239+
p.addEstablished("agent3", int64(1), agent3ConnID1)
240+
p.removeEstablished("agent2", int64(1))
241+
p.removeEstablished("agent2", int64(2))
242+
p.removeEstablished("agent1", int64(1))
243243
expectedFrontends = map[string]map[int64]*ProxyClientConnection{
244244
"agent1": {
245245
int64(2): agent1ConnID2,
@@ -248,7 +248,7 @@ func TestAddRemoveFrontends(t *testing.T) {
248248
int64(1): agent3ConnID1,
249249
},
250250
}
251-
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
251+
if e, a := expectedFrontends, p.established; !reflect.DeepEqual(e, a) {
252252
t.Errorf("expected %v, got %v", e, a)
253253
}
254254
}
@@ -263,29 +263,29 @@ func TestEstablishedConnsMetric(t *testing.T) {
263263
agent3ConnID1 := new(ProxyClientConnection)
264264

265265
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
266-
p.addFrontend("agent1", int64(1), agent1ConnID1)
266+
p.addEstablished("agent1", int64(1), agent1ConnID1)
267267
assertEstablishedConnsMetric(t, 1)
268-
p.addFrontend("agent1", int64(2), agent1ConnID2)
268+
p.addEstablished("agent1", int64(2), agent1ConnID2)
269269
assertEstablishedConnsMetric(t, 2)
270-
p.addFrontend("agent2", int64(1), agent2ConnID1)
270+
p.addEstablished("agent2", int64(1), agent2ConnID1)
271271
assertEstablishedConnsMetric(t, 3)
272-
p.addFrontend("agent2", int64(2), agent2ConnID2)
272+
p.addEstablished("agent2", int64(2), agent2ConnID2)
273273
assertEstablishedConnsMetric(t, 4)
274-
p.addFrontend("agent3", int64(1), agent3ConnID1)
274+
p.addEstablished("agent3", int64(1), agent3ConnID1)
275275
assertEstablishedConnsMetric(t, 5)
276-
p.removeFrontend("agent2", int64(1))
276+
p.removeEstablished("agent2", int64(1))
277277
assertEstablishedConnsMetric(t, 4)
278-
p.removeFrontend("agent2", int64(2))
278+
p.removeEstablished("agent2", int64(2))
279279
assertEstablishedConnsMetric(t, 3)
280-
p.removeFrontend("agent1", int64(1))
280+
p.removeEstablished("agent1", int64(1))
281281
assertEstablishedConnsMetric(t, 2)
282-
p.removeFrontend("agent1", int64(2))
282+
p.removeEstablished("agent1", int64(2))
283283
assertEstablishedConnsMetric(t, 1)
284-
p.removeFrontend("agent3", int64(1))
284+
p.removeEstablished("agent3", int64(1))
285285
assertEstablishedConnsMetric(t, 0)
286286
}
287287

288-
func TestRemoveFrontendsForBackendConn(t *testing.T) {
288+
func TestRemoveEstablishedForBackendConn(t *testing.T) {
289289
backend1 := &backend{}
290290
backend2 := &backend{}
291291
backend3 := &backend{}
@@ -295,12 +295,12 @@ func TestRemoveFrontendsForBackendConn(t *testing.T) {
295295
agent2ConnID2 := &ProxyClientConnection{backend: backend2}
296296
agent3ConnID1 := &ProxyClientConnection{backend: backend3}
297297
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
298-
p.addFrontend("agent1", int64(1), agent1ConnID1)
299-
p.addFrontend("agent1", int64(2), agent1ConnID2)
300-
p.addFrontend("agent2", int64(1), agent2ConnID1)
301-
p.addFrontend("agent2", int64(2), agent2ConnID2)
302-
p.addFrontend("agent3", int64(1), agent3ConnID1)
303-
p.removeFrontendsForBackendConn("agent2", backend2)
298+
p.addEstablished("agent1", int64(1), agent1ConnID1)
299+
p.addEstablished("agent1", int64(2), agent1ConnID2)
300+
p.addEstablished("agent2", int64(1), agent2ConnID1)
301+
p.addEstablished("agent2", int64(2), agent2ConnID2)
302+
p.addEstablished("agent3", int64(1), agent3ConnID1)
303+
p.removeEstablishedForBackendConn("agent2", backend2)
304304
expectedFrontends := map[string]map[int64]*ProxyClientConnection{
305305
"agent1": {
306306
int64(1): agent1ConnID1,
@@ -310,12 +310,12 @@ func TestRemoveFrontendsForBackendConn(t *testing.T) {
310310
int64(1): agent3ConnID1,
311311
},
312312
}
313-
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
313+
if e, a := expectedFrontends, p.established; !reflect.DeepEqual(e, a) {
314314
t.Errorf("expected %v, got %v", e, a)
315315
}
316316
}
317317

318-
func TestRemoveFrontendsForStream(t *testing.T) {
318+
func TestRemoveEstablishedForStream(t *testing.T) {
319319
streamUID := "target-uuid"
320320
backend1 := &backend{}
321321
backend2 := &backend{}
@@ -326,12 +326,12 @@ func TestRemoveFrontendsForStream(t *testing.T) {
326326
agent2ConnID2 := &ProxyClientConnection{backend: backend2}
327327
agent3ConnID1 := &ProxyClientConnection{backend: backend3, frontend: &GrpcFrontend{streamUID: streamUID}}
328328
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
329-
p.addFrontend("agent1", int64(1), agent1ConnID1)
330-
p.addFrontend("agent1", int64(2), agent1ConnID2)
331-
p.addFrontend("agent2", int64(1), agent2ConnID1)
332-
p.addFrontend("agent2", int64(2), agent2ConnID2)
333-
p.addFrontend("agent3", int64(1), agent3ConnID1)
334-
p.removeFrontendsForStream(streamUID)
329+
p.addEstablished("agent1", int64(1), agent1ConnID1)
330+
p.addEstablished("agent1", int64(2), agent1ConnID2)
331+
p.addEstablished("agent2", int64(1), agent2ConnID1)
332+
p.addEstablished("agent2", int64(2), agent2ConnID2)
333+
p.addEstablished("agent3", int64(1), agent3ConnID1)
334+
p.removeEstablishedForStream(streamUID)
335335
expectedFrontends := map[string]map[int64]*ProxyClientConnection{
336336
"agent1": {
337337
int64(2): agent1ConnID2,
@@ -340,7 +340,7 @@ func TestRemoveFrontendsForStream(t *testing.T) {
340340
int64(2): agent2ConnID2,
341341
},
342342
}
343-
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
343+
if e, a := expectedFrontends, p.established; !reflect.DeepEqual(e, a) {
344344
t.Errorf("expected %v, got %v", e, a)
345345
}
346346
}

0 commit comments

Comments
 (0)