Skip to content

Commit 3ec1d59

Browse files
authored
Merge pull request iqiyi#937 from ywc689/fix-hc-agent-racing
Fix backend update racing problem with dpvs-agent and healthcheck.
2 parents ef96bbf + fad525d commit 3ec1d59

34 files changed

+933
-376
lines changed

Diff for: src/VERSION

+5-35
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,14 @@
11
#!/bin/sh
22
# program: dpvs
3-
# Dec 19, 2023 #
3+
# Mar 12, 2024 #
44
##
5-
# Features
6-
# - New tool: **dpvs-agent**, a management daemon tool for dpvs based on OpenAPI.
7-
# - New tool: **healthcheck**, a service health check daemon tool cooperating with dpvs-agent.
8-
# - Dpvs: Develop **passive health check** methods for tcp and bidirectional udp backends.
9-
# - Dpvs: Add supports for **Proxy Protocol** with both v1 and v2 versions.
10-
# - Dpvs: Add supports for extended statistics of ethernet devices.
11-
# - Dpvs: Add configuration file and dpip supports for allmulticast setting switch.
12-
# - Build: Transfer all build configurations to a top-level file `config.mk`.
13-
# - Containerization: Draft a Dockerfile and a tutorial document to build and run dpvs in container.
14-
#
155
# Bugfixes
16-
# - Dpvs: Protect toa from source address spoofing attack and increase success ratio for source address delievery via toa.
17-
# - Dpvs: Adjust tcp window scale in outbound direction for synproxy to improve throughput in bulk upload cases.
18-
# - Dpvs: Fix timer inaccuracy problem when timing over 524s.
19-
# - Dpvs: Fix the crash problem caused by ether address list buffer overflow.
20-
# - Dpvs: Fix the crash problem caused by dividing by zero when bonding slaves attempt to send packets out.
21-
# - Dpvs: Fix the crash problem caused by inconsistent data structures of `dp_vs_dest_compat` between dpvs and keepalived.
22-
# - Dpvs: Correct ipo option length for judgement of branching to standalone uoa.
23-
# - Dpvs: Inhibit setting multicast ether address from slave lcores.
24-
# - Dpvs: Fix service flag conflicts of synproxy and expire-quiescent.
25-
# - Dpvs: Fix the chaos use of flag, flags and fwdmode in dest and service structures.
26-
# - Dpvs: Fix service flush function not usable problem.
27-
# - Dpvs: Fix invalid port problem when getting verbose information of netif devices.
28-
# - Dpvs: Use atomic operation to generate packet id for ipv4 header.
29-
# - Dpvs: Remove fragile implementations of strategy routing for snat.
30-
# - Dpvs: Remove the stale config item "ipc_msg/unix_domain".
31-
# - Keepalived: Do not delete and re-add vs/rs to eliminate service disturbances at reload.
32-
# - Keepalived: Fix a carsh problem caused by missing definition of allowlist/denylist config items.
33-
# - Ipvsadm: Add `conn-timeout` configuration option for service.
34-
# - Ipvsadm: Fix the ambiguous use of '-Y' configuration option.
35-
# - Ipvsadm: Fix icmpv6 configuration option `-1` lost problem..
36-
# - Ipvsadm: Update help text, including supported schedulers, laddr and allow/deny ip list.
37-
# - Dpip: Fix line break problem in help message.
38-
# - Uoa: Enable ipv6 with a macro for uoa example server.
6+
# - tools: Fix concurrency problem between dpvs-agent and healthcheck in editing realserver .
7+
# - tools/dpvs-agent: Add the snapshot cache.
8+
# - tools/healthchech: Fix occasionally arising bad icmp checksum problem for udp and udpping checkers.
399
#
4010

4111
export VERSION=1.9
42-
export RELEASE=6
12+
export RELEASE=7
4313

4414
echo $VERSION-$RELEASE

Diff for: src/ipvs/ip_vs_dest.c

+4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ static void __dp_vs_dest_update(struct dp_vs_service *svc,
7474
int conn_flags;
7575

7676
rte_atomic16_set(&dest->weight, udest->weight);
77+
if (udest->flags & DPVS_DEST_F_INHIBITED)
78+
dp_vs_dest_set_inhibited(dest);
79+
else
80+
dp_vs_dest_clear_inhibited(dest);
7781
conn_flags = udest->conn_flags | DPVS_CONN_F_INACTIVE;
7882
dest->fwdmode = udest->fwdmode;
7983
rte_atomic16_set(&dest->conn_flags, conn_flags);

Diff for: tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,16 @@ func (agent *DpvsAgentServer) LocalLoad(cp *pool.ConnPool, parentLogger hclog.Lo
3232
logger = parentLogger.Named("LoadConfigFile")
3333
}
3434

35-
snapshot := settings.ShareSnapshot()
36-
if err := snapshot.LoadFrom(settings.LocalConfigFile(), logger); err != nil {
35+
nodeSnap := settings.ShareSnapshot()
36+
if err := nodeSnap.LoadFrom(settings.LocalConfigFile(), logger); err != nil {
3737
return err
3838
}
3939

40-
announcePort := snapshot.NodeSpec.AnnouncePort
41-
laddrs := snapshot.NodeSpec.Laddrs
40+
announcePort := nodeSnap.NodeSpec.AnnouncePort
41+
laddrs := nodeSnap.NodeSpec.Laddrs
4242

43-
for _, service := range snapshot.Services {
43+
for _, snap := range nodeSnap.Snapshot {
44+
service := snap.Service
4445
// 1> ipvsadm -A vip:port -s wrr
4546
vs := types.NewVirtualServerSpec()
4647
vs.SetAddr(service.Addr)

Diff for: tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,21 @@ func (h *delVsItem) Handle(params apiVs.DeleteVsVipPortParams) middleware.Respon
4545
return apiVs.NewDeleteVsVipPortFailure()
4646
}
4747

48+
shareSnapshot := settings.ShareSnapshot()
49+
snapshot := shareSnapshot.SnapshotGet(params.VipPort)
50+
if snapshot != nil {
51+
snapshot.Lock()
52+
defer snapshot.Unlock()
53+
}
54+
4855
result := vs.Del(h.connPool, h.logger)
4956
switch result {
5057
case types.EDPVS_OK:
51-
settings.ShareSnapshot().ServiceDel(params.VipPort)
58+
shareSnapshot.ServiceDel(params.VipPort)
5259
h.logger.Info("Del virtual server success.", "VipPort", params.VipPort)
5360
return apiVs.NewDeleteVsVipPortOK()
5461
case types.EDPVS_NOTEXIST:
55-
settings.ShareSnapshot().ServiceDel(params.VipPort)
62+
shareSnapshot.ServiceDel(params.VipPort)
5663
h.logger.Warn("Del a not exist virtual server done.", "VipPort", params.VipPort, "result", result.String())
5764
return apiVs.NewDeleteVsVipPortNotFound()
5865
default:

Diff for: tools/dpvs-agent/cmd/ipvs/get_vs.go

+20-37
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,28 @@ func NewGetVs(cp *pool.ConnPool, parentLogger hclog.Logger) *getVs {
4040
}
4141

4242
func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder {
43+
shareSnapshot := settings.ShareSnapshot()
44+
if params.Healthcheck != nil && !*params.Healthcheck {
45+
return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger))
46+
}
47+
48+
// if params.Snapshot != nil && *params.Snapshot {
49+
// shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger)
50+
// }
51+
4352
front := types.NewVirtualServerFront()
4453
vss, err := front.Get(h.connPool, h.logger)
4554
if err != nil {
4655
h.logger.Error("Get virtual server list failed.", "Error", err.Error())
47-
// FIXME: Invalid
48-
return apiVs.NewGetVsOK()
56+
return apiVs.NewGetVsNoContent()
4957
}
5058

51-
shareSnapshot := settings.ShareSnapshot()
59+
vsModels := models.VirtualServerList{
60+
Items: make([]*models.VirtualServerSpecExpand, len(vss)),
61+
}
5262

5363
h.logger.Info("Get all virtual server done.", "vss", vss)
54-
for _, vs := range vss {
64+
for i, vs := range vss {
5565
front := types.NewRealServerFront()
5666

5767
err := front.ParseVipPortProto(vs.ID())
@@ -69,45 +79,18 @@ func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder {
6979

7080
h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss)
7181

72-
vsModel := vs.GetModel()
73-
vsStats := (*types.ServerStats)(vsModel.Stats)
74-
vsModel.RSs = new(models.RealServerExpandList)
75-
vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
82+
vsModels.Items[i] = vs.GetModel()
83+
vsStats := (*types.ServerStats)(vsModels.Items[i].Stats)
84+
vsModels.Items[i].RSs = new(models.RealServerExpandList)
85+
vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
7686

7787
for j, rs := range rss {
7888
rsModel := rs.GetModel()
7989
rsStats := (*types.ServerStats)(rsModel.Stats)
80-
vsModel.RSs.Items[j] = rsModel
90+
vsModels.Items[i].RSs.Items[j] = rsModel
8191
vsStats.Increase(rsStats)
8292
}
83-
84-
if shareSnapshot.NodeSpec.Laddrs == nil {
85-
laddr := types.NewLocalAddrFront()
86-
if err := laddr.ParseVipPortProto(vs.ID()); err != nil {
87-
// FIXME: Invalid
88-
return apiVs.NewGetVsOK()
89-
}
90-
91-
laddrs, err := laddr.Get(h.connPool, h.logger)
92-
if err != nil {
93-
// FIXME: Invalid
94-
return apiVs.NewGetVsOK()
95-
}
96-
97-
shareSnapshot.NodeSpec.Laddrs = new(models.LocalAddressExpandList)
98-
laddrModels := shareSnapshot.NodeSpec.Laddrs
99-
laddrModels.Items = make([]*models.LocalAddressSpecExpand, len(laddrs))
100-
for k, lip := range laddrs {
101-
laddrModels.Items[k] = lip.GetModel()
102-
}
103-
}
104-
105-
shareSnapshot.ServiceUpsert(vsModel)
106-
}
107-
108-
if params.Snapshot != nil && *params.Snapshot {
109-
shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger)
11093
}
11194

112-
return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger))
95+
return apiVs.NewGetVsOK().WithPayload(&vsModels)
11396
}

Diff for: tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go

+46-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package ipvs
1616

1717
import (
18+
"strings"
19+
1820
"github.com/dpvs-agent/models"
1921
"github.com/dpvs-agent/pkg/ipc/pool"
2022
"github.com/dpvs-agent/pkg/ipc/types"
@@ -40,10 +42,33 @@ func NewGetVsVipPort(cp *pool.ConnPool, parentLogger hclog.Logger) *getVsVipPort
4042
}
4143

4244
func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Responder {
45+
shareSnapshot := settings.ShareSnapshot()
46+
if params.Healthcheck != nil && !*params.Healthcheck {
47+
vsModel := shareSnapshot.ServiceGet(params.VipPort)
48+
if vsModel != nil {
49+
vsModels := new(models.VirtualServerList)
50+
vsModels.Items = make([]*models.VirtualServerSpecExpand, 1)
51+
vsModels.Items[0] = vsModel
52+
return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
53+
}
54+
}
55+
56+
vaild := true
4357
var vss []*types.VirtualServerSpec
4458
spec := types.NewVirtualServerSpec()
4559
err := spec.ParseVipPortProto(params.VipPort)
4660
if err != nil {
61+
vaild = false
62+
if params.Healthcheck != nil && !*params.Healthcheck {
63+
// invalid VipPort string
64+
// respond full cache info
65+
vsModels := shareSnapshot.GetModels(h.logger)
66+
if len(vsModels.Items) != 0 {
67+
return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
68+
}
69+
// read from dpvs memory
70+
}
71+
4772
h.logger.Warn("Convert to virtual server failed. Get All virtual server.", "VipPort", params.VipPort, "Error", err.Error())
4873
front := types.NewVirtualServerFront()
4974
vss, err = front.Get(h.connPool, h.logger)
@@ -56,10 +81,9 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon
5681
return apiVs.NewGetVsVipPortNotFound()
5782
}
5883

59-
shareSnapshot := settings.ShareSnapshot()
60-
61-
vsModels := new(models.VirtualServerList)
62-
vsModels.Items = make([]*models.VirtualServerSpecExpand, len(vss))
84+
vsModels := &models.VirtualServerList{
85+
Items: make([]*models.VirtualServerSpecExpand, len(vss)),
86+
}
6387

6488
for i, vs := range vss {
6589
front := types.NewRealServerFront()
@@ -80,20 +104,32 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon
80104
h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss)
81105

82106
vsModel := vs.GetModel()
83-
shareSnapshot.ServiceUpsert(vsModel)
84-
// vsModel.Version = shareSnapshot.ServiceVersion(vs.ID())
85107
vsModels.Items[i] = vsModel
86-
vsStats := (*types.ServerStats)(vsModels.Items[i].Stats)
87-
vsModels.Items[i].RSs = new(models.RealServerExpandList)
88-
vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
108+
vsStats := (*types.ServerStats)(vsModel.Stats)
109+
vsModel.RSs = new(models.RealServerExpandList)
110+
vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
89111

90112
for j, rs := range rss {
91113
rsModel := rs.GetModel()
92114
rsStats := (*types.ServerStats)(rsModel.Stats)
93-
vsModels.Items[i].RSs.Items[j] = rsModel
115+
vsModel.RSs.Items[j] = rsModel
94116
vsStats.Increase(rsStats)
95117
}
96118
}
97119

120+
if vaild {
121+
targetModels := &models.VirtualServerList{
122+
Items: make([]*models.VirtualServerSpecExpand, 1),
123+
}
124+
125+
for _, vsModel := range vsModels.Items {
126+
typesVsModel := (*types.VirtualServerSpecExpandModel)(vsModel)
127+
if strings.EqualFold(spec.ID(), typesVsModel.ID()) {
128+
targetModels.Items[0] = vsModel
129+
return apiVs.NewGetVsVipPortOK().WithPayload(targetModels)
130+
}
131+
}
132+
}
133+
98134
return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
99135
}

Diff for: tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go

+52-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
package ipvs
1616

1717
import (
18-
// "github.com/dpvs-agent/models"
18+
"strings"
19+
20+
"github.com/dpvs-agent/models"
1921
"github.com/dpvs-agent/pkg/ipc/pool"
2022
"github.com/dpvs-agent/pkg/ipc/types"
2123
"github.com/dpvs-agent/pkg/settings"
@@ -46,6 +48,10 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond
4648
return apiVs.NewPostVsVipPortRsInvalidFrontend()
4749
}
4850

51+
if params.Rss == nil || params.Rss.Items == nil {
52+
return apiVs.NewPostVsVipPortRsInvalidFrontend()
53+
}
54+
4955
rss := make([]*types.RealServerSpec, len(params.Rss.Items))
5056
for i, rs := range params.Rss.Items {
5157
var fwdmode types.DpvsFwdMode
@@ -56,15 +62,58 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond
5662
rss[i].SetWeight(uint32(rs.Weight))
5763
rss[i].SetProto(front.GetProto())
5864
rss[i].SetAddr(rs.IP)
59-
rss[i].SetInhibited(rs.Inhibited)
6065
rss[i].SetOverloaded(rs.Overloaded)
6166
rss[i].SetFwdMode(fwdmode)
67+
// NOTE: inhibited set by healthcheck module with API /vs/${ID}/rs/health only
68+
// we clear it default
69+
inhibited := false
70+
if rs.Inhibited != nil {
71+
inhibited = *rs.Inhibited
72+
}
73+
rss[i].SetInhibited(&inhibited)
74+
}
75+
76+
shareSnapshot := settings.ShareSnapshot()
77+
if shareSnapshot.ServiceLock(params.VipPort) {
78+
defer shareSnapshot.ServiceUnlock(params.VipPort)
6279
}
6380

6481
result := front.Update(rss, h.connPool, h.logger)
6582
switch result {
6683
case types.EDPVS_EXIST, types.EDPVS_OK:
67-
settings.ShareSnapshot().ServiceVersionUpdate(params.VipPort, h.logger)
84+
// Update Snapshot
85+
vsModel := shareSnapshot.ServiceGet(params.VipPort)
86+
if vsModel == nil {
87+
spec := types.NewVirtualServerSpec()
88+
err := spec.ParseVipPortProto(params.VipPort)
89+
if err != nil {
90+
h.logger.Warn("Convert to virtual server failed.", "VipPort", params.VipPort, "Error", err.Error())
91+
// FIXME return
92+
}
93+
vss, err := spec.Get(h.connPool, h.logger)
94+
if err != nil {
95+
h.logger.Error("Get virtual server failed.", "svc VipPort", params.VipPort, "Error", err.Error())
96+
// FIXME return
97+
}
98+
99+
for _, vs := range vss {
100+
if strings.EqualFold(vs.ID(), spec.ID()) {
101+
shareSnapshot.ServiceAdd(vs)
102+
break
103+
}
104+
}
105+
} else {
106+
vsModel.RSs = &models.RealServerExpandList{
107+
Items: make([]*models.RealServerSpecExpand, len(rss)),
108+
}
109+
110+
for i, rs := range rss {
111+
vsModel.RSs.Items[i] = rs.GetModel()
112+
}
113+
}
114+
115+
shareSnapshot.ServiceVersionUpdate(params.VipPort, h.logger)
116+
68117
h.logger.Info("Set real server to virtual server success.", "VipPort", params.VipPort, "rss", rss, "result", result.String())
69118
return apiVs.NewPostVsVipPortRsOK()
70119
default:

0 commit comments

Comments
 (0)