Skip to content

Commit c010d02

Browse files
authored
fix: add server isEnd determination criteria for message retrieval. (#813)
* fix: add server isEnd determination criteria for message retrieval. Signed-off-by: Gordon <[email protected]> * fix: add server isEnd determination criteria for message retrieval. Signed-off-by: Gordon <[email protected]> * fix: add server isEnd determination criteria for message retrieval. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 1387678 commit c010d02

File tree

7 files changed

+168
-79
lines changed

7 files changed

+168
-79
lines changed

go.mod

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,29 @@
11
module github.com/openimsdk/openim-sdk-core/v3
22

3-
go 1.21
3+
go 1.22.7
4+
5+
toolchain go1.22.10
46

57
require (
68
github.com/golang/protobuf v1.5.4
79
github.com/gorilla/websocket v1.4.2
810
github.com/jinzhu/copier v0.4.0
911
github.com/pkg/errors v0.9.1
10-
google.golang.org/protobuf v1.33.0 // indirect
12+
google.golang.org/protobuf v1.35.1
1113
gorm.io/driver/sqlite v1.5.5
1214
nhooyr.io/websocket v1.8.10
1315
)
1416

15-
require golang.org/x/net v0.22.0 // indirect
17+
require golang.org/x/net v0.29.0 // indirect
1618

1719
require (
1820
github.com/google/go-cmp v0.6.0
19-
github.com/openimsdk/protocol v0.0.72-alpha.54
21+
github.com/openimsdk/protocol v0.0.72-alpha.63
2022
github.com/openimsdk/tools v0.0.50-alpha.21
2123
github.com/patrickmn/go-cache v2.1.0+incompatible
2224
golang.org/x/image v0.15.0
23-
golang.org/x/sync v0.6.0
25+
golang.org/x/sync v0.8.0
26+
google.golang.org/grpc v1.68.0
2427
gorm.io/gorm v1.25.10
2528
)
2629

@@ -52,11 +55,10 @@ require (
5255
go.uber.org/multierr v1.6.0 // indirect
5356
go.uber.org/zap v1.24.0 // indirect
5457
golang.org/x/arch v0.3.0 // indirect
55-
golang.org/x/crypto v0.21.0 // indirect
56-
golang.org/x/sys v0.18.0 // indirect
57-
golang.org/x/text v0.14.0 // indirect
58-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
59-
google.golang.org/grpc v1.62.1 // indirect
58+
golang.org/x/crypto v0.27.0 // indirect
59+
golang.org/x/sys v0.25.0 // indirect
60+
golang.org/x/text v0.18.0 // indirect
61+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
6062
gopkg.in/yaml.v3 v3.0.1 // indirect
6163
)
6264

go.sum

+18-18
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
6666
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
6767
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
6868
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
69-
github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s=
70-
github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
69+
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
70+
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
7171
github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY=
7272
github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
7373
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
@@ -105,26 +105,26 @@ go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
105105
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
106106
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
107107
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
108-
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
109-
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
108+
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
109+
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
110110
golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
111111
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
112-
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
113-
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
114-
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
115-
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
112+
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
113+
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
114+
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
115+
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
116116
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
117117
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
118-
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
119-
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
120-
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
121-
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
122-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
123-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
124-
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
125-
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
126-
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
127-
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
118+
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
119+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
120+
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
121+
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
122+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
123+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
124+
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
125+
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
126+
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
127+
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
128128
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
129129
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
130130
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/conversation_msg/conversation.go

+57-36
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
7777
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
7878
t = time.Now()
7979

80-
var thisEndSeq int64
81-
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
80+
messageList = c.LocalChatLog2MsgStruct(list)
8281
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
8382
t = time.Now()
8483
if !isReverse {
8584
sort.Sort(messageList)
86-
if thisEndSeq != 0 {
87-
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
88-
}
89-
} else {
90-
if thisEndSeq != 0 {
91-
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
92-
}
85+
9386
}
9487
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
9588
messageListCallback.MessageList = messageList
@@ -104,20 +97,60 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
10497

10598
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
10699
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
107-
if len(messages) == 0 {
108-
return count
109-
}
110-
100+
var thisEndSeq int64
111101
// Represents the number of valid messages in the batch
112102
validateMessageNum := 0
113103
for _, msg := range messages {
114-
if msg.Status < constant.MsgStatusHasDeleted {
115-
validateMessageNum++
116-
validMessages = append(validMessages, msg)
104+
if msg.Seq != 0 && thisEndSeq == 0 {
105+
thisEndSeq = msg.Seq
106+
}
107+
if isReverse {
108+
if msg.Seq > thisEndSeq && thisEndSeq != 0 {
109+
thisEndSeq = msg.Seq
110+
}
111+
117112
} else {
113+
if msg.Seq < thisEndSeq && msg.Seq != 0 {
114+
thisEndSeq = msg.Seq
115+
}
116+
}
117+
if msg.Status >= constant.MsgStatusHasDeleted {
118118
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
119+
continue
120+
}
121+
122+
validateMessageNum++
123+
validMessages = append(validMessages, msg)
124+
125+
}
126+
if !isReverse {
127+
if thisEndSeq != 0 {
128+
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
129+
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key)
130+
if value < lastEndSeq || lastEndSeq == 0 {
131+
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
132+
return true
133+
}
134+
log.ZWarn(ctx, "The end sequence number of the message is more than the last end sequence number",
135+
nil, "conversationID", key, "value", value, "lastEndSeq", lastEndSeq)
136+
return false
137+
})
138+
}
139+
} else {
140+
if thisEndSeq != 0 {
141+
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
142+
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key)
143+
if value > lastEndSeq || lastEndSeq == 0 {
144+
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
145+
return true
146+
}
147+
log.ZWarn(ctx, "The end sequence number of the message is less than the last end sequence number",
148+
nil, "conversationID", key, "value", value, "lastEndSeq", lastEndSeq)
149+
return false
150+
})
119151
}
120152
}
153+
121154
return count - validateMessageNum
122155
}
123156
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
@@ -139,11 +172,11 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
139172
t = time.Now()
140173
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
141174
count, startTime, &list, messageListCallback)
142-
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
175+
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
143176
t = time.Now()
144177
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
145178
isReverse, count, startTime, &list, messageListCallback)
146-
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
179+
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
147180
t = time.Now()
148181
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
149182
count, startTime, &list, messageListCallback)
@@ -152,8 +185,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
152185
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
153186
missingCount := shouldFetchMoreMessagesNum(list)
154187
if missingCount > 0 && !messageListCallback.IsEnd {
155-
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
156-
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
188+
newStartTime := getNewStartTime(list)
189+
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
190+
conversationID, "newStartTime", newStartTime)
191+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback)
157192
if err != nil {
158193
return nil, err
159194
}
@@ -164,31 +199,17 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
164199
return validMessages, nil
165200
}
166201

167-
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
202+
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog) []*sdk_struct.MsgStruct {
168203
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
169-
var thisEndSeq int64
170204
for _, v := range list {
171-
if v.Seq != 0 && thisEndSeq == 0 {
172-
thisEndSeq = v.Seq
173-
}
174-
if isReverse {
175-
if v.Seq > thisEndSeq && thisEndSeq != 0 {
176-
thisEndSeq = v.Seq
177-
}
178-
179-
} else {
180-
if v.Seq < thisEndSeq && v.Seq != 0 {
181-
thisEndSeq = v.Seq
182-
}
183-
}
184205
temp := LocalChatLogToMsgStruct(v)
185206

186207
if temp.AttachedInfoElem.IsPrivateChat && temp.SendTime+int64(temp.AttachedInfoElem.BurnDuration) < time.Now().Unix() {
187208
continue
188209
}
189210
messageList = append(messageList, temp)
190211
}
191-
return thisEndSeq, messageList
212+
return messageList
192213
}
193214

194215
func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {

internal/conversation_msg/message_check.go

+67-7
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
8686
// Perform an end-of-block check if the retrieved message count is less than requested
8787
if len(*list) < count {
8888
if isReverse {
89-
currentMaxSeq := c.maxSeqRecorder.Get(conversationID)
89+
currentMaxSeq := c.getConversationMaxSeq(ctx, conversationID)
9090
maxSeq, _, _ := c.getMaxAndMinHaveSeqList(*list)
9191
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "maxSeq", maxSeq, "conversationID", conversationID, "currentMaxSeq", currentMaxSeq)
9292
// Use >= to prevent the currentMaxSeq from being updated too slowly,
9393
// which could lead to misjudgments and cause repeated message fetching."
94-
if maxSeq >= currentMaxSeq { // todo Replace `1` with the minimum sequence value as defined by the user or system
94+
if maxSeq >= currentMaxSeq {
9595
messageListCallback.IsEnd = true
9696
} else {
9797
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID)
@@ -104,7 +104,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
104104
// The batch includes sequences but has not reached the maximum value,
105105
// This condition indicates local-only messages, with `maxSeq < maxSeqRecorderMaxSeq` as the only case,
106106
// since `lastEndSeq < maxSeqRecorderMaxSeq` is handled in inter-block continuity.
107-
lostSeqList := getLostSeqListWithLimitLength(maxSeq+1, c.maxSeqRecorder.Get(conversationID), []int64{})
107+
lostSeqList := getLostSeqListWithLimitLength(maxSeq+1, currentMaxSeq, []int64{})
108108
if len(lostSeqList) > 0 {
109109
isShouldFetchMessage = true
110110
seqList = lostSeqList
@@ -115,22 +115,24 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
115115
}
116116
return isShouldFetchMessage, seqList
117117
} else {
118+
userCanPullMinSeq := c.getConversationMinSeq(ctx, conversationID)
118119
_, minSeq, _ := c.getMaxAndMinHaveSeqList(*list)
119-
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "minSeq", minSeq, "conversationID", conversationID)
120-
if minSeq == 1 { // todo Replace `1` with the minimum sequence value as defined by the user or system
120+
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "minSeq", minSeq,
121+
"conversationID", conversationID, "userCanPullMinSeq", userCanPullMinSeq)
122+
if minSeq == userCanPullMinSeq {
121123
messageListCallback.IsEnd = true
122124
} else {
123125
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID)
124126
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastMinSeq", lastMinSeq, "conversationID", conversationID)
125127
// If `minSeq` is zero and `lastMinSeq` is at the minimum server sequence, this batch is fully local
126-
if minSeq == 0 && lastMinSeq == 1 { // All messages in this batch are local messages,
128+
if minSeq == 0 && lastMinSeq == userCanPullMinSeq { // All messages in this batch are local messages,
127129
// and the minimum seq of the last batch of valid messages has already reached the minimum pullable seq from the server.
128130
messageListCallback.IsEnd = true
129131
} else {
130132
// The batch includes sequences but has not reached the minimum value,
131133
// This condition indicates local-only messages, with `minSeq > 1` as the only case,
132134
// since `lastMinSeq > 1` is handled in inter-block continuity.
133-
lostSeqList := getLostSeqListWithLimitLength(1, minSeq-1, []int64{})
135+
lostSeqList := getLostSeqListWithLimitLength(userCanPullMinSeq, minSeq-1, []int64{})
134136
if len(lostSeqList) > 0 {
135137
isShouldFetchMessage = true
136138
seqList = lostSeqList
@@ -200,6 +202,11 @@ func (c *Conversation) fetchAndMergeMissingMessages(ctx context.Context, convers
200202
conversationSeqs.ConversationID = conversationID
201203
conversationSeqs.Seqs = seqList
202204
getSeqMessageReq.Conversations = append(getSeqMessageReq.Conversations, &conversationSeqs)
205+
if isReverse {
206+
getSeqMessageReq.Order = sdkws.PullOrder_PullOrderAsc
207+
} else {
208+
getSeqMessageReq.Order = sdkws.PullOrder_PullOrderDesc
209+
}
203210
log.ZDebug(ctx, "conversation pull message, ", "req", getSeqMessageReq)
204211
if startTime == 0 && !c.LongConnMgr.IsConnected() {
205212
return
@@ -219,6 +226,9 @@ func (c *Conversation) fetchAndMergeMissingMessages(ctx context.Context, convers
219226
c.pullMessageIntoTable(ctx, getSeqMessageResp.Msgs)
220227
log.ZDebug(ctx, "syncMsgFromServerSplit pull msg success",
221228
"conversationID", conversationID, "count", count, "len", len(*list), "msgLen", len(v.Msgs))
229+
if v.IsEnd {
230+
c.setConversationMinSeq(ctx, isReverse, conversationID, v.EndSeq)
231+
}
222232
localMessage := datautil.Batch(MsgDataToLocalChatLog, v.Msgs)
223233
if !isReverse {
224234
reverse(localMessage)
@@ -228,6 +238,56 @@ func (c *Conversation) fetchAndMergeMissingMessages(ctx context.Context, convers
228238

229239
}
230240
}
241+
242+
func (c *Conversation) getConversationMaxSeq(ctx context.Context, conversationID string) int64 {
243+
conversation, err := c.db.GetConversation(ctx, conversationID)
244+
if err != nil {
245+
log.ZWarn(ctx, "Failed to get conversation", err)
246+
return c.maxSeqRecorder.Get(conversationID)
247+
}
248+
if conversation.MaxSeq == 0 {
249+
return c.maxSeqRecorder.Get(conversationID)
250+
251+
}
252+
return conversation.MaxSeq
253+
}
254+
func (c *Conversation) getConversationMinSeq(ctx context.Context, conversationID string) int64 {
255+
conversation, err := c.db.GetConversation(ctx, conversationID)
256+
if err != nil {
257+
log.ZWarn(ctx, "Failed to get conversation", err)
258+
return 1
259+
}
260+
if conversation.MinSeq == 0 {
261+
return 1
262+
263+
}
264+
return conversation.MinSeq
265+
}
266+
func (c *Conversation) setConversationMinSeq(ctx context.Context, isReverse bool, conversationID string, endSeq int64) {
267+
conversation, err := c.db.GetConversation(ctx, conversationID)
268+
if err != nil {
269+
log.ZWarn(ctx, "Failed to get conversation", err)
270+
return
271+
}
272+
if !isReverse {
273+
if conversation.MinSeq == 0 || endSeq > conversation.MinSeq {
274+
conversation.MinSeq = endSeq
275+
}
276+
} else {
277+
if conversation.MaxSeq == 0 || endSeq < conversation.MaxSeq {
278+
conversation.MaxSeq = endSeq
279+
err = c.db.UpdateConversation(ctx, conversation)
280+
if err != nil {
281+
log.ZWarn(ctx, "Failed to update conversation", err)
282+
}
283+
}
284+
285+
}
286+
err = c.db.UpdateConversation(ctx, conversation)
287+
if err != nil {
288+
log.ZWarn(ctx, "Failed to update conversation", err)
289+
}
290+
}
231291
func errHandle(seqList []int64, list *[]*model_struct.LocalChatLog, err error, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
232292
messageListCallback.ErrCode = 100
233293
messageListCallback.ErrMsg = err.Error()

0 commit comments

Comments
 (0)