Skip to content

Commit a875204

Browse files
authored
implement least active load balance (#602)
feat: add least active load balance
1 parent 9709b35 commit a875204

File tree

6 files changed

+258
-1
lines changed

6 files changed

+258
-1
lines changed

pkg/remoting/getty/getty_remoting.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
getty "github.com/apache/dubbo-getty"
2626

2727
"github.com/seata/seata-go/pkg/protocol/message"
28+
"github.com/seata/seata-go/pkg/remoting/rpc"
2829
"github.com/seata/seata-go/pkg/util/log"
2930
)
3031

@@ -61,14 +62,26 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba
6162
if s == nil {
6263
s = sessionManager.selectSession(msg)
6364
}
64-
return g.sendAsync(s, msg, callback)
65+
rpc.BeginCount(s.RemoteAddr())
66+
result, err := g.sendAsync(s, msg, callback)
67+
rpc.EndCount(s.RemoteAddr())
68+
if err != nil {
69+
log.Errorf("send message: %#v, session: %s", msg, s.Stat())
70+
return nil, err
71+
}
72+
return result, err
6573
}
6674

6775
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
6876
if s == nil {
6977
s = sessionManager.selectSession(msg)
7078
}
79+
rpc.BeginCount(s.RemoteAddr())
7180
_, err := g.sendAsync(s, msg, callback)
81+
rpc.EndCount(s.RemoteAddr())
82+
if err != nil {
83+
log.Errorf("send message: %#v, session: %s", msg, s.Stat())
84+
}
7285
return err
7386
}
7487

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package loadbalance
19+
20+
import (
21+
"math/rand"
22+
"sync"
23+
"time"
24+
25+
"github.com/seata/seata-go/pkg/remoting/rpc"
26+
27+
getty "github.com/apache/dubbo-getty"
28+
)
29+
30+
func LeastActiveLoadBalance(sessions *sync.Map, xid string) getty.Session {
31+
var session getty.Session
32+
var leastActive int32 = -1
33+
leastCount := 0
34+
var leastIndexes []getty.Session
35+
sessions.Range(func(key, value interface{}) bool {
36+
session = key.(getty.Session)
37+
if session.IsClosed() {
38+
sessions.Delete(session)
39+
} else {
40+
active := rpc.GetStatus(session.RemoteAddr()).GetActive()
41+
if leastActive == -1 || active < leastActive {
42+
leastActive = active
43+
leastCount = 1
44+
if len(leastIndexes) > 0 {
45+
leastIndexes = leastIndexes[:0]
46+
}
47+
leastIndexes = append(leastIndexes, session)
48+
} else if active == leastActive {
49+
leastIndexes = append(leastIndexes, session)
50+
leastCount++
51+
}
52+
}
53+
return true
54+
})
55+
56+
if leastCount == 0 {
57+
return nil
58+
}
59+
60+
if leastCount == 1 {
61+
return leastIndexes[0]
62+
} else {
63+
return leastIndexes[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(leastCount)]
64+
}
65+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package loadbalance
19+
20+
import (
21+
"fmt"
22+
"strconv"
23+
"sync"
24+
"testing"
25+
26+
"github.com/golang/mock/gomock"
27+
"github.com/seata/seata-go/pkg/remoting/mock"
28+
"github.com/seata/seata-go/pkg/remoting/rpc"
29+
"github.com/stretchr/testify/assert"
30+
)
31+
32+
func TestLeastActiveLoadBalance(t *testing.T) {
33+
ctrl := gomock.NewController(t)
34+
sessions := &sync.Map{}
35+
36+
for i := 1; i <= 3; i++ {
37+
session := mock.NewMockTestSession(ctrl)
38+
session.EXPECT().IsClosed().Return(false).AnyTimes()
39+
addr := "127.0.0." + strconv.Itoa(i) + ":8000"
40+
session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
41+
return addr
42+
})
43+
sessions.Store(session, fmt.Sprintf("session-%d", i))
44+
rpc.BeginCount(addr)
45+
}
46+
47+
session := mock.NewMockTestSession(ctrl)
48+
session.EXPECT().IsClosed().Return(true).AnyTimes()
49+
addr := "127.0.0.5:8000"
50+
session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
51+
return addr
52+
})
53+
sessions.Store(session, "session-5")
54+
rpc.BeginCount(addr)
55+
56+
countTwo := "127.0.0.1:8000"
57+
rpc.BeginCount(countTwo)
58+
59+
result := LeastActiveLoadBalance(sessions, "test_xid")
60+
assert.False(t, result.RemoteAddr() == countTwo)
61+
assert.False(t, result.RemoteAddr() == addr)
62+
assert.False(t, result.IsClosed())
63+
}

pkg/remoting/loadbalance/loadbalance.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Sessio
3737
return RandomLoadBalance(sessions, xid)
3838
case xidLoadBalance:
3939
return XidLoadBalance(sessions, xid)
40+
case leastActiveLoadBalance:
41+
return LeastActiveLoadBalance(sessions, xid)
4042
case roundRobinLoadBalance:
4143
return RoundRobinLoadBalance(sessions, xid)
4244
default:

pkg/remoting/rpc/rpc_status.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package rpc
19+
20+
import (
21+
"sync"
22+
"sync/atomic"
23+
)
24+
25+
var serviceStatusMap sync.Map
26+
27+
type Status struct {
28+
Active int32
29+
Total int32
30+
}
31+
32+
// RemoveStatus remove the RpcStatus of this service
33+
func RemoveStatus(service string) {
34+
serviceStatusMap.Delete(service)
35+
}
36+
37+
// BeginCount begin count
38+
func BeginCount(service string) {
39+
status := GetStatus(service)
40+
atomic.AddInt32(&status.Active, 1)
41+
}
42+
43+
// EndCount end count
44+
func EndCount(service string) {
45+
status := GetStatus(service)
46+
atomic.AddInt32(&status.Active, -1)
47+
atomic.AddInt32(&status.Total, 1)
48+
}
49+
50+
// GetStatus get status
51+
func GetStatus(service string) *Status {
52+
a, _ := serviceStatusMap.LoadOrStore(service, new(Status))
53+
return a.(*Status)
54+
}
55+
56+
// GetActive get active.
57+
func (s *Status) GetActive() int32 {
58+
return s.Active
59+
}
60+
61+
// GetTotal get total.
62+
func (s *Status) GetTotal() int32 {
63+
return s.Total
64+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package rpc
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
var service = "127.0.0.1:8000"
27+
28+
func TestStatus(t *testing.T) {
29+
rpcStatus1 := GetStatus(service)
30+
assert.NotNil(t, rpcStatus1)
31+
rpcStatus2 := GetStatus(service)
32+
assert.Equal(t, rpcStatus1, rpcStatus2)
33+
}
34+
35+
func TestRemoveStatus(t *testing.T) {
36+
old := GetStatus(service)
37+
RemoveStatus(service)
38+
assert.Equal(t, GetStatus(service), old)
39+
}
40+
41+
func TestBeginCount(t *testing.T) {
42+
BeginCount(service)
43+
assert.Equal(t, GetStatus(service).GetActive(), int32(1))
44+
}
45+
46+
func TestEndCount(t *testing.T) {
47+
EndCount(service)
48+
assert.Equal(t, GetStatus(service).GetActive(), int32(0))
49+
assert.Equal(t, GetStatus(service).GetTotal(), int32(1))
50+
}

0 commit comments

Comments
 (0)