Skip to content

Commit 7ab06df

Browse files
authored
Merge branch 'master' into feature_refresh_cache
2 parents dbce868 + 8ce1006 commit 7ab06df

11 files changed

+233
-214
lines changed

pkg/client/client.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,17 @@ func initTmClient(cfg *Config) {
6262
})
6363
}
6464

65-
// initRemoting init rpc client
65+
// initRemoting init remoting
6666
func initRemoting(cfg *Config) {
67-
getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
67+
seataConfig := remoteConfig.SeataConfig{
6868
ApplicationID: cfg.ApplicationID,
6969
TxServiceGroup: cfg.TxServiceGroup,
7070
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
7171
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
7272
LoadBalanceType: cfg.GettyConfig.LoadBalanceType,
73-
})
73+
}
74+
75+
getty.InitGetty(&cfg.GettyConfig, &seataConfig)
7476
}
7577

7678
// InitRmClient init client rm client

pkg/remoting/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type SeataConfig struct {
8484
LoadBalanceType string
8585
}
8686

87-
func IniConfig(seataConf *SeataConfig) {
87+
func InitConfig(seataConf *SeataConfig) {
8888
seataConfig = seataConf
8989
}
9090

pkg/remoting/getty/getty_client.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ var (
3535
)
3636

3737
type GettyRemotingClient struct {
38-
idGenerator *atomic.Uint32
38+
idGenerator *atomic.Uint32
39+
gettyRemoting *GettyRemoting
3940
}
4041

4142
func GetGettyRemotingClient() *GettyRemotingClient {
4243
if gettyRemotingClient == nil {
4344
onceGettyRemotingClient.Do(func() {
4445
gettyRemotingClient = &GettyRemotingClient{
45-
idGenerator: &atomic.Uint32{},
46+
idGenerator: &atomic.Uint32{},
47+
gettyRemoting: newGettyRemoting(),
4648
}
4749
})
4850
}
@@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
6365
Compressor: 0,
6466
Body: msg,
6567
}
66-
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
68+
return client.gettyRemoting.SendAsync(rpcMessage, nil, client.asyncCallback)
6769
}
6870

6971
func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
@@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{
7476
Compressor: 0,
7577
Body: msg,
7678
}
77-
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
79+
return client.gettyRemoting.SendAsync(rpcMessage, nil, nil)
7880
}
7981

8082
func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
@@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
8587
Compressor: 0,
8688
Body: msg,
8789
}
88-
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
90+
return client.gettyRemoting.SendSync(rpcMessage, nil, client.syncCallback)
8991
}
9092

9193
func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
@@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *
9698
func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
9799
select {
98100
case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout):
99-
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
101+
g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID)
100102
log.Errorf("wait resp timeout: %#v", reqMsg)
101103
return nil, fmt.Errorf("wait response timeout, request: %#v", reqMsg)
102104
case <-respMsg.Done:
103105
return respMsg.Response, respMsg.Err
104106
}
105107
}
108+
109+
func (client *GettyRemotingClient) GetMergedMessage(msgID int32) *message.MergedWarpMessage {
110+
return client.gettyRemoting.GetMergedMessage(msgID)
111+
}
112+
113+
func (client *GettyRemotingClient) GetMessageFuture(msgID int32) *message.MessageFuture {
114+
return client.gettyRemoting.GetMessageFuture(msgID)
115+
}
116+
117+
func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) {
118+
client.gettyRemoting.RemoveMessageFuture(msgID)
119+
}
120+
121+
func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) {
122+
client.gettyRemoting.RemoveMergedMessageFuture(msgID)
123+
}
124+
125+
func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg message.RpcMessage) {
126+
client.gettyRemoting.NotifyRpcMessageResponse(msg)
127+
}

pkg/remoting/getty/getty_client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
4040
},
4141
},
4242
}
43-
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendSync",
43+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendSync",
4444
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{},
4545
error) {
4646
return respMsg, nil
@@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
5252

5353
// TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse function
5454
func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) {
55-
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
55+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
5656
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
5757
return nil
5858
})
@@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) {
7777
}
7878
for _, test := range tests {
7979
t.Run(test.name, func(t *testing.T) {
80-
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
80+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
8181
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
8282
return nil
8383
})

pkg/remoting/getty/getty_init.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package getty
19+
20+
import (
21+
"seata.apache.org/seata-go/pkg/protocol/codec"
22+
"seata.apache.org/seata-go/pkg/remoting/config"
23+
)
24+
25+
func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
26+
config.InitConfig(seataConfig)
27+
codec.Init()
28+
initSessionManager(gettyConfig)
29+
}

pkg/remoting/getty/getty_remoting.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,6 @@ const (
3333
RpcRequestTimeout = 20 * time.Second
3434
)
3535

36-
var (
37-
gettyRemoting *GettyRemoting
38-
onceGettyRemoting = &sync.Once{}
39-
)
40-
4136
type (
4237
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
4338
GettyRemoting struct {
@@ -46,16 +41,11 @@ type (
4641
}
4742
)
4843

49-
func GetGettyRemotingInstance() *GettyRemoting {
50-
if gettyRemoting == nil {
51-
onceGettyRemoting.Do(func() {
52-
gettyRemoting = &GettyRemoting{
53-
futures: &sync.Map{},
54-
mergeMsgMap: &sync.Map{},
55-
}
56-
})
44+
func newGettyRemoting() *GettyRemoting {
45+
return &GettyRemoting{
46+
futures: &sync.Map{},
47+
mergeMsgMap: &sync.Map{},
5748
}
58-
return gettyRemoting
5949
}
6050

6151
func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
@@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba
7262
return result, err
7363
}
7464

75-
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
65+
func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
7666
if s == nil {
7767
s = sessionManager.selectSession(msg)
7868
}

pkg/remoting/getty/getty_remoting_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) {
4747
},
4848
},
4949
}
50+
gettyRemotingClient := GetGettyRemotingClient()
5051
for _, test := range tests {
5152
t.Run(test.name, func(t *testing.T) {
5253
if test.messageFuture != nil {
53-
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
54-
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
54+
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
55+
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
5556
assert.Equal(t, *test.messageFuture, *messageFuture)
5657
} else {
57-
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
58+
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
5859
assert.Empty(t, messageFuture)
5960
}
6061
})
@@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) {
7879
},
7980
},
8081
}
82+
gettyRemotingClient := GetGettyRemotingClient()
8183
for _, test := range tests {
8284
t.Run(test.name, func(t *testing.T) {
83-
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
84-
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
85+
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
86+
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
8587
assert.Equal(t, messageFuture, test.messageFuture)
86-
GetGettyRemotingInstance().RemoveMessageFuture(test.msgID)
87-
messageFuture = GetGettyRemotingInstance().GetMessageFuture(test.msgID)
88+
gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID)
89+
messageFuture = gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
8890
assert.Empty(t, messageFuture)
8991
})
9092
}
@@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) {
110112
},
111113
},
112114
}
115+
gettyRemotingClient := GetGettyRemotingClient()
113116
for _, test := range tests {
114117
t.Run(test.name, func(t *testing.T) {
115118
if test.mergedWarpMessage != nil {
116-
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
117-
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
119+
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
120+
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
118121
assert.Equal(t, *test.mergedWarpMessage, *mergedWarpMessage)
119122
} else {
120-
mergedWarpMessage := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
123+
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
121124
assert.Empty(t, mergedWarpMessage)
122125
}
123126
})
@@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t *testing.T) {
144147
},
145148
},
146149
}
150+
gettyRemotingClient := GetGettyRemotingClient()
147151
for _, test := range tests {
148152
t.Run(test.name, func(t *testing.T) {
149153
if test.mergedWarpMessage != nil {
150-
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
151-
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
154+
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
155+
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
152156
assert.NotEmpty(t, mergedWarpMessage)
153-
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
154-
mergedWarpMessage = GetGettyRemotingInstance().GetMergedMessage(test.msgID)
157+
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
158+
mergedWarpMessage = gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
155159
assert.Empty(t, mergedWarpMessage)
156160
} else {
157-
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
158-
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
161+
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
162+
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
159163
assert.Empty(t, mergedWarpMessage)
160164
}
161165
})

pkg/remoting/getty/listener.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,16 @@ var (
3838
)
3939

4040
type gettyClientHandler struct {
41-
idGenerator *atomic.Uint32
42-
msgFutures *sync.Map
43-
mergeMsgMap *sync.Map
44-
sessionManager *SessionManager
45-
processorMap map[message.MessageType]processor.RemotingProcessor
41+
idGenerator *atomic.Uint32
42+
processorMap map[message.MessageType]processor.RemotingProcessor
4643
}
4744

4845
func GetGettyClientHandlerInstance() *gettyClientHandler {
4946
if clientHandler == nil {
5047
onceClientHandler.Do(func() {
5148
clientHandler = &gettyClientHandler{
52-
idGenerator: &atomic.Uint32{},
53-
msgFutures: &sync.Map{},
54-
mergeMsgMap: &sync.Map{},
55-
sessionManager: sessionManager,
56-
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
49+
idGenerator: &atomic.Uint32{},
50+
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
5751
}
5852
})
5953
}
@@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
6256

6357
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
6458
log.Infof("Open new getty session ")
65-
g.sessionManager.registerSession(session)
59+
sessionManager.registerSession(session)
6660
conf := config.GetSeataConfig()
6761
go func() {
6862
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
@@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
7367
err := GetGettyRemotingClient().SendAsyncRequest(request)
7468
if err != nil {
7569
log.Errorf("OnOpen error: {%#v}", err.Error())
76-
g.sessionManager.releaseSession(session)
70+
sessionManager.releaseSession(session)
7771
return
7872
}
7973
}()
@@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
8377

8478
func (g *gettyClientHandler) OnError(session getty.Session, err error) {
8579
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
86-
g.sessionManager.releaseSession(session)
80+
sessionManager.releaseSession(session)
8781
}
8882

8983
func (g *gettyClientHandler) OnClose(session getty.Session) {
9084
log.Infof("session{%s} is closing......", session.Stat())
91-
g.sessionManager.releaseSession(session)
85+
sessionManager.releaseSession(session)
9286
}
9387

9488
func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
@@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session) {
117111
log.Debug("session{%s} Oncron executing", session.Stat())
118112
err := g.transferHeartBeat(session, message.HeartBeatMessagePing)
119113
if err != nil {
120-
log.Errorf("failed to send heart beat: {%#v}", err.Error())
121-
g.sessionManager.releaseSession(session)
114+
log.Warnf("failed to send heart beat: {%#v}", err.Error())
115+
if session.GetAttribute(heartBeatRetryTimesKey) != nil {
116+
retryTimes := session.GetAttribute(heartBeatRetryTimesKey).(int)
117+
if retryTimes >= maxHeartBeatRetryTimes {
118+
log.Warnf("heartbeat retry times exceed default max retry times{%d}, close the session{%s}",
119+
maxHeartBeatRetryTimes, session.Stat())
120+
sessionManager.releaseSession(session)
121+
return
122+
}
123+
session.SetAttribute(heartBeatRetryTimesKey, retryTimes+1)
124+
} else {
125+
session.SetAttribute(heartBeatRetryTimesKey, 1)
126+
}
122127
}
123128
}
124129

@@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session getty.Session, msg messag
130135
Compressor: 0,
131136
Body: msg,
132137
}
133-
return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
138+
return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage, session, nil)
134139
}
135140

136141
func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {

0 commit comments

Comments
 (0)