diff --git a/pkg/remoting/getty/getty_remoting.go b/pkg/remoting/getty/getty_remoting.go index d04743882..772f67201 100644 --- a/pkg/remoting/getty/getty_remoting.go +++ b/pkg/remoting/getty/getty_remoting.go @@ -25,6 +25,7 @@ import ( getty "github.com/apache/dubbo-getty" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/remoting/rpc" "github.com/seata/seata-go/pkg/util/log" ) @@ -61,14 +62,26 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba if s == nil { s = sessionManager.selectSession(msg) } - return g.sendAsync(s, msg, callback) + rpc.BeginCount(s.RemoteAddr()) + result, err := g.sendAsync(s, msg, callback) + rpc.EndCount(s.RemoteAddr()) + if err != nil { + log.Errorf("send message: %#v, session: %s", msg, s.Stat()) + return nil, err + } + return result, err } func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error { if s == nil { s = sessionManager.selectSession(msg) } + rpc.BeginCount(s.RemoteAddr()) _, err := g.sendAsync(s, msg, callback) + rpc.EndCount(s.RemoteAddr()) + if err != nil { + log.Errorf("send message: %#v, session: %s", msg, s.Stat()) + } return err } diff --git a/pkg/remoting/loadbalance/least_active_loadbalance.go b/pkg/remoting/loadbalance/least_active_loadbalance.go new file mode 100644 index 000000000..50b98f016 --- /dev/null +++ b/pkg/remoting/loadbalance/least_active_loadbalance.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "math/rand" + "sync" + "time" + + "github.com/seata/seata-go/pkg/remoting/rpc" + + getty "github.com/apache/dubbo-getty" +) + +func LeastActiveLoadBalance(sessions *sync.Map, xid string) getty.Session { + var session getty.Session + var leastActive int32 = -1 + leastCount := 0 + var leastIndexes []getty.Session + sessions.Range(func(key, value interface{}) bool { + session = key.(getty.Session) + if session.IsClosed() { + sessions.Delete(session) + } else { + active := rpc.GetStatus(session.RemoteAddr()).GetActive() + if leastActive == -1 || active < leastActive { + leastActive = active + leastCount = 1 + if len(leastIndexes) > 0 { + leastIndexes = leastIndexes[:0] + } + leastIndexes = append(leastIndexes, session) + } else if active == leastActive { + leastIndexes = append(leastIndexes, session) + leastCount++ + } + } + return true + }) + + if leastCount == 0 { + return nil + } + + if leastCount == 1 { + return leastIndexes[0] + } else { + return leastIndexes[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(leastCount)] + } +} diff --git a/pkg/remoting/loadbalance/least_active_loadbalance_test.go b/pkg/remoting/loadbalance/least_active_loadbalance_test.go new file mode 100644 index 000000000..a63deb213 --- /dev/null +++ b/pkg/remoting/loadbalance/least_active_loadbalance_test.go @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "fmt" + "strconv" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/seata/seata-go/pkg/remoting/mock" + "github.com/seata/seata-go/pkg/remoting/rpc" + "github.com/stretchr/testify/assert" +) + +func TestLeastActiveLoadBalance(t *testing.T) { + ctrl := gomock.NewController(t) + sessions := &sync.Map{} + + for i := 1; i <= 3; i++ { + session := mock.NewMockTestSession(ctrl) + session.EXPECT().IsClosed().Return(false).AnyTimes() + addr := "127.0.0." + strconv.Itoa(i) + ":8000" + session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string { + return addr + }) + sessions.Store(session, fmt.Sprintf("session-%d", i)) + rpc.BeginCount(addr) + } + + session := mock.NewMockTestSession(ctrl) + session.EXPECT().IsClosed().Return(true).AnyTimes() + addr := "127.0.0.5:8000" + session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string { + return addr + }) + sessions.Store(session, "session-5") + rpc.BeginCount(addr) + + countTwo := "127.0.0.1:8000" + rpc.BeginCount(countTwo) + + result := LeastActiveLoadBalance(sessions, "test_xid") + assert.False(t, result.RemoteAddr() == countTwo) + assert.False(t, result.RemoteAddr() == addr) + assert.False(t, result.IsClosed()) +} diff --git a/pkg/remoting/loadbalance/loadbalance.go b/pkg/remoting/loadbalance/loadbalance.go index 8451e1706..f867793bc 100644 --- a/pkg/remoting/loadbalance/loadbalance.go +++ b/pkg/remoting/loadbalance/loadbalance.go @@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Sessio return RandomLoadBalance(sessions, xid) case xidLoadBalance: return XidLoadBalance(sessions, xid) + case leastActiveLoadBalance: + return LeastActiveLoadBalance(sessions, xid) case roundRobinLoadBalance: return RoundRobinLoadBalance(sessions, xid) default: diff --git a/pkg/remoting/rpc/rpc_status.go b/pkg/remoting/rpc/rpc_status.go new file mode 100644 index 000000000..e595237ac --- /dev/null +++ b/pkg/remoting/rpc/rpc_status.go @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "sync" + "sync/atomic" +) + +var serviceStatusMap sync.Map + +type Status struct { + Active int32 + Total int32 +} + +// RemoveStatus remove the RpcStatus of this service +func RemoveStatus(service string) { + serviceStatusMap.Delete(service) +} + +// BeginCount begin count +func BeginCount(service string) { + status := GetStatus(service) + atomic.AddInt32(&status.Active, 1) +} + +// EndCount end count +func EndCount(service string) { + status := GetStatus(service) + atomic.AddInt32(&status.Active, -1) + atomic.AddInt32(&status.Total, 1) +} + +// GetStatus get status +func GetStatus(service string) *Status { + a, _ := serviceStatusMap.LoadOrStore(service, new(Status)) + return a.(*Status) +} + +// GetActive get active. +func (s *Status) GetActive() int32 { + return s.Active +} + +// GetTotal get total. +func (s *Status) GetTotal() int32 { + return s.Total +} diff --git a/pkg/remoting/rpc/rpc_status_test.go b/pkg/remoting/rpc/rpc_status_test.go new file mode 100644 index 000000000..2e92d267f --- /dev/null +++ b/pkg/remoting/rpc/rpc_status_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +var service = "127.0.0.1:8000" + +func TestStatus(t *testing.T) { + rpcStatus1 := GetStatus(service) + assert.NotNil(t, rpcStatus1) + rpcStatus2 := GetStatus(service) + assert.Equal(t, rpcStatus1, rpcStatus2) +} + +func TestRemoveStatus(t *testing.T) { + old := GetStatus(service) + RemoveStatus(service) + assert.Equal(t, GetStatus(service), old) +} + +func TestBeginCount(t *testing.T) { + BeginCount(service) + assert.Equal(t, GetStatus(service).GetActive(), int32(1)) +} + +func TestEndCount(t *testing.T) { + EndCount(service) + assert.Equal(t, GetStatus(service).GetActive(), int32(0)) + assert.Equal(t, GetStatus(service).GetTotal(), int32(1)) +}