@@ -26,6 +26,7 @@ type RequestPool struct {
26
26
semaphore * semaphore.Weighted
27
27
lock sync.RWMutex
28
28
QueueSize int64
29
+ existMap map [string ]bool
29
30
}
30
31
31
32
type Request struct {
@@ -37,6 +38,7 @@ type Request struct {
37
38
func (rp * RequestPool ) Start () {
38
39
rp .queue = make ([]Request , 0 )
39
40
rp .semaphore = semaphore .NewWeighted (rp .QueueSize )
41
+ rp .existMap = make (map [string ]bool )
40
42
}
41
43
42
44
// Submit a request into the pool, returns an error when request is already in the pool
@@ -52,15 +54,15 @@ func (rp *RequestPool) Submit(request []byte) error {
52
54
}
53
55
rp .lock .Lock ()
54
56
defer rp .lock .Unlock ()
55
- for _ , existingReq := range rp .queue {
56
- if existingReq .ClientID == reqInfo .ClientID && existingReq .ID == reqInfo .ID {
57
- rp .semaphore .Release (1 )
58
- err := fmt .Sprintf ("a request with ID %v and client ID %v already exists in the pool" , reqInfo .ID , reqInfo .ClientID )
59
- rp .Log .Errorf (err )
60
- return fmt .Errorf (err )
61
- }
57
+ existStr := fmt .Sprintf ("%v~%v" ,reqInfo .ClientID , reqInfo .ID )
58
+ if _ , exist := rp .existMap [existStr ] ; exist {
59
+ rp .semaphore .Release (1 )
60
+ err := fmt .Sprintf ("a request with ID %v and client ID %v already exists in the pool" , reqInfo .ID , reqInfo .ClientID )
61
+ rp .Log .Errorf (err )
62
+ return fmt .Errorf (err )
62
63
}
63
64
rp .queue = append (rp .queue , req )
65
+ rp .existMap [existStr ] = true
64
66
return nil
65
67
}
66
68
@@ -78,20 +80,20 @@ func (rp *RequestPool) NextRequests(n int) []Request {
78
80
func (rp * RequestPool ) RemoveRequest (request Request ) error {
79
81
rp .lock .Lock ()
80
82
defer rp .lock .Unlock ()
81
- removed := false
82
- for i , existingReq := range rp .queue {
83
- if existingReq .ClientID == request .ClientID && existingReq .ID == request .ID {
84
- rp .Log .Infof ("Removed request %v from request pool" , request )
85
- rp .queue = append (rp .queue [:i ], rp .queue [i + 1 :]... )
86
- removed = true
87
- rp .semaphore .Release (1 )
88
- break
83
+ existStr := fmt .Sprintf ("%v~%v" ,request .ClientID , request .ID )
84
+ if _ , exist := rp .existMap [existStr ] ; exist {
85
+ for i , existingReq := range rp .queue {
86
+ if existingReq .ClientID == request .ClientID && existingReq .ID == request .ID {
87
+ rp .Log .Infof ("Removed request %v from request pool" , request )
88
+ rp .queue = append (rp .queue [:i ], rp .queue [i + 1 :]... )
89
+ delete (rp .existMap , existStr )
90
+ rp .semaphore .Release (1 )
91
+ return nil
92
+ }
89
93
}
90
94
}
91
- if ! removed {
92
- err := fmt .Sprintf ("Request %v is not in the pool at remove time" , request )
93
- rp .Log .Warnf (err )
94
- return fmt .Errorf (err )
95
- }
96
- return nil
95
+ err := fmt .Sprintf ("Request %v is not in the pool at remove time" , request )
96
+ rp .Log .Warnf (err )
97
+ return fmt .Errorf (err )
98
+
97
99
}
0 commit comments