From 8a5b29782976ca7faa81bd47579f6c042fbf18d4 Mon Sep 17 00:00:00 2001 From: Jingliu Xiong <928124786@qq.com> Date: Sat, 18 Nov 2023 14:58:07 +0800 Subject: [PATCH] feat: add round robin in remote module --- pkg/remoting/loadbalance/loadbalance.go | 2 + .../loadbalance/round_robin_loadbalance.go | 69 ++++++++++++ .../round_robin_loadbalance_test.go | 100 ++++++++++++++++++ 3 files changed, 171 insertions(+) create mode 100644 pkg/remoting/loadbalance/round_robin_loadbalance.go create mode 100644 pkg/remoting/loadbalance/round_robin_loadbalance_test.go diff --git a/pkg/remoting/loadbalance/loadbalance.go b/pkg/remoting/loadbalance/loadbalance.go index c5ddb6791..8451e1706 100644 --- a/pkg/remoting/loadbalance/loadbalance.go +++ b/pkg/remoting/loadbalance/loadbalance.go @@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Sessio return RandomLoadBalance(sessions, xid) case xidLoadBalance: return XidLoadBalance(sessions, xid) + case roundRobinLoadBalance: + return RoundRobinLoadBalance(sessions, xid) default: return RandomLoadBalance(sessions, xid) } diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance.go b/pkg/remoting/loadbalance/round_robin_loadbalance.go new file mode 100644 index 000000000..9cebc9262 --- /dev/null +++ b/pkg/remoting/loadbalance/round_robin_loadbalance.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "math" + "sort" + "sync" + "sync/atomic" + + getty "github.com/apache/dubbo-getty" +) + +var sequence int32 + +func RoundRobinLoadBalance(sessions *sync.Map, s string) getty.Session { + // collect sync.Map adderToSession + // filter out closed session instance + adderToSession := make(map[string]getty.Session, 0) + // map has no sequence, we should sort it to make sure the sequence is always the same + adders := make([]string, 0) + sessions.Range(func(key, value interface{}) bool { + session := key.(getty.Session) + if session.IsClosed() { + sessions.Delete(key) + } else { + adderToSession[session.RemoteAddr()] = session + adders = append(adders, session.RemoteAddr()) + } + return true + }) + sort.Strings(adders) + // adderToSession eq 0 means there are no available session + if len(adderToSession) == 0 { + return nil + } + index := getPositiveSequence() % len(adderToSession) + return adderToSession[adders[index]] +} + +func getPositiveSequence() int { + for { + current := atomic.LoadInt32(&sequence) + var next int32 + if current == math.MaxInt32 { + next = 0 + } else { + next = current + 1 + } + if atomic.CompareAndSwapInt32(&sequence, current, next) { + return int(current) + } + } +} diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go new file mode 100644 index 000000000..c58265706 --- /dev/null +++ b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "fmt" + "math" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/seata/seata-go/pkg/remoting/mock" +) + +func TestRoundRobinLoadBalance_Normal(t *testing.T) { + ctrl := gomock.NewController(t) + sessions := &sync.Map{} + + for i := 0; i < 10; i++ { + session := mock.NewMockTestSession(ctrl) + session.EXPECT().IsClosed().Return(i == 2).AnyTimes() + session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", i)).AnyTimes() + sessions.Store(session, fmt.Sprintf("session-%d", i+1)) + } + + for i := 0; i < 10; i++ { + if i == 2 { + continue + } + result := RoundRobinLoadBalance(sessions, "some_xid") + assert.Equal(t, fmt.Sprintf("%d", i), result.RemoteAddr()) + assert.NotNil(t, result) + assert.False(t, result.IsClosed()) + } +} + +func TestRoundRobinLoadBalance_OverSequence(t *testing.T) { + ctrl := gomock.NewController(t) + sessions := &sync.Map{} + sequence = math.MaxInt32 + + for i := 0; i < 10; i++ { + session := mock.NewMockTestSession(ctrl) + session.EXPECT().IsClosed().Return(false).AnyTimes() + session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", i)).AnyTimes() + sessions.Store(session, fmt.Sprintf("session-%d", i+1)) + } + + for i := 0; i < 10; i++ { + // over sequence here + if i == 0 { + result := RoundRobinLoadBalance(sessions, "some_xid") + assert.Equal(t, "7", result.RemoteAddr()) + assert.NotNil(t, result) + assert.False(t, result.IsClosed()) + continue + } + result := RoundRobinLoadBalance(sessions, "some_xid") + assert.Equal(t, fmt.Sprintf("%d", i-1), result.RemoteAddr()) + assert.NotNil(t, result) + assert.False(t, result.IsClosed()) + } +} + +func TestRoundRobinLoadBalance_All_Closed(t *testing.T) { + ctrl := gomock.NewController(t) + sessions := &sync.Map{} + for i := 0; i < 10; i++ { + session := mock.NewMockTestSession(ctrl) + session.EXPECT().IsClosed().Return(true).AnyTimes() + sessions.Store(session, fmt.Sprintf("session-%d", i+1)) + } + if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil { + t.Errorf("Expected nil, actual got %+v", result) + } +} + +func TestRoundRobinLoadBalance_Empty(t *testing.T) { + sessions := &sync.Map{} + if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil { + t.Errorf("Expected nil, actual got %+v", result) + } +}