-
Notifications
You must be signed in to change notification settings - Fork 241
/
Copy pathrt_diversity_filter.go
163 lines (133 loc) · 4.29 KB
/
rt_diversity_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package dht
import (
"sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var dfLog = logging.Logger("dht/RtDiversityFilter")
var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil)
type rtPeerIPGroupFilter struct {
mu sync.RWMutex
h host.Host
maxPerCpl int
maxForTable int
cplIpGroupCount map[int]map[peerdiversity.PeerIPGroupKey]int
tableIpGroupCount map[peerdiversity.PeerIPGroupKey]int
}
// NewRTPeerDiversityFilter constructs the `PeerIPGroupFilter` that will be used to configure
// the diversity filter for the Routing Table.
// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details.
func NewRTPeerDiversityFilter(h host.Host, maxPerCpl, maxForTable int) *rtPeerIPGroupFilter {
return &rtPeerIPGroupFilter{
h: h,
maxPerCpl: maxPerCpl,
maxForTable: maxForTable,
cplIpGroupCount: make(map[int]map[peerdiversity.PeerIPGroupKey]int),
tableIpGroupCount: make(map[peerdiversity.PeerIPGroupKey]int),
}
}
func (r *rtPeerIPGroupFilter) Allow(g peerdiversity.PeerGroupInfo) bool {
r.mu.RLock()
defer r.mu.RUnlock()
key := g.IPGroupKey
cpl := g.Cpl
if r.tableIpGroupCount[key] >= r.maxForTable {
dfLog.Debugw("rejecting (max for table) diversity", "peer", g.Id, "cpl", g.Cpl, "ip group", g.IPGroupKey)
return false
}
c, ok := r.cplIpGroupCount[cpl]
allow := !ok || c[key] < r.maxPerCpl
if !allow {
dfLog.Debugw("rejecting (max for cpl) diversity", "peer", g.Id, "cpl", g.Cpl, "ip group", g.IPGroupKey)
}
return allow
}
func (r *rtPeerIPGroupFilter) Increment(g peerdiversity.PeerGroupInfo) {
r.mu.Lock()
defer r.mu.Unlock()
key := g.IPGroupKey
cpl := g.Cpl
r.tableIpGroupCount[key] = r.tableIpGroupCount[key] + 1
if _, ok := r.cplIpGroupCount[cpl]; !ok {
r.cplIpGroupCount[cpl] = make(map[peerdiversity.PeerIPGroupKey]int)
}
r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] + 1
}
func (r *rtPeerIPGroupFilter) Decrement(g peerdiversity.PeerGroupInfo) {
r.mu.Lock()
defer r.mu.Unlock()
key := g.IPGroupKey
cpl := g.Cpl
r.tableIpGroupCount[key] = r.tableIpGroupCount[key] - 1
if r.tableIpGroupCount[key] == 0 {
delete(r.tableIpGroupCount, key)
}
r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] - 1
if r.cplIpGroupCount[cpl][key] == 0 {
delete(r.cplIpGroupCount[cpl], key)
}
if len(r.cplIpGroupCount[cpl]) == 0 {
delete(r.cplIpGroupCount, cpl)
}
}
func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr {
cs := r.h.Network().ConnsToPeer(p)
addr := make([]ma.Multiaddr, 0, len(cs))
for _, c := range cs {
addr = append(addr, c.RemoteMultiaddr())
}
return addr
}
// filterPeersByIPDiversity filters out peers from the response that are overrepresented by IP group.
// If an IP group has more than `limit` peers, all peers with at least 1 address in that IP group
// are filtered out.
func filterPeersByIPDiversity(newPeers []*peer.AddrInfo, limit int) []*peer.AddrInfo {
// If no diversity limit is set, return all peers
if limit == 0 {
return newPeers
}
// Count peers per IP group
ipGroupPeers := make(map[peerdiversity.PeerIPGroupKey]map[peer.ID]struct{})
for _, p := range newPeers {
// Find all IP groups this peer belongs to
for _, addr := range p.Addrs {
ip, err := manet.ToIP(addr)
if err != nil {
continue
}
group := peerdiversity.IPGroupKey(ip)
if len(group) == 0 {
continue
}
if _, ok := ipGroupPeers[group]; !ok {
ipGroupPeers[group] = make(map[peer.ID]struct{})
}
ipGroupPeers[group][p.ID] = struct{}{}
}
}
// Identify overrepresented groups and tag peers for removal
peersToRemove := make(map[peer.ID]struct{})
for _, peers := range ipGroupPeers {
if len(peers) > limit {
for p := range peers {
peersToRemove[p] = struct{}{}
}
}
}
if len(peersToRemove) == 0 {
// No groups are overrepresented, return all peers
return newPeers
}
// Filter out peers from overrepresented groups
filteredPeers := make([]*peer.AddrInfo, 0, len(newPeers))
for _, p := range newPeers {
if _, ok := peersToRemove[p.ID]; !ok {
filteredPeers = append(filteredPeers, p)
}
}
return filteredPeers
}