Skip to content

Commit c9ca3cd

Browse files
authored
fix: get reverse history message change. (#804)
* fix: change errs to custom errs avoid sdk panic. Signed-off-by: Gordon <[email protected]> * fix: get reverse history message change. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 1e65770 commit c9ca3cd

File tree

5 files changed

+250
-199
lines changed

5 files changed

+250
-199
lines changed

internal/conversation_msg/conversation.go

+52-33
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
6464
}
6565
startTime = m.SendTime
6666
} else {
67-
c.messagePullMinSeqMap.Delete(conversationID)
67+
// Clear both maps when the user enters the conversation
68+
c.messagePullForwardEndSeqMap.Delete(conversationID)
69+
c.messagePullReverseEndSeqMap.Delete(conversationID)
6870
}
6971
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
7072
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
@@ -75,41 +77,48 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
7577
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
7678
t = time.Now()
7779

78-
var thisMinSeq int64
79-
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list)
80+
var thisEndSeq int64
81+
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
8082
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
8183
t = time.Now()
8284
if !isReverse {
8385
sort.Sort(messageList)
86+
if thisEndSeq != 0 {
87+
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
88+
}
89+
} else {
90+
if thisEndSeq != 0 {
91+
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
92+
}
8493
}
8594
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
8695
messageListCallback.MessageList = messageList
87-
if thisMinSeq != 0 {
88-
c.messagePullMinSeqMap.Store(conversationID, thisMinSeq)
89-
}
90-
return &messageListCallback, nil
9196

97+
return &messageListCallback, nil
9298
}
9399

94100
func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
95101
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
96102

97-
var list []*model_struct.LocalChatLog
103+
var list, validMessages []*model_struct.LocalChatLog
98104

99-
// If all retrieved messages are either deleted or filtered out, continue fetching messages from an earlier point.
100-
shouldFetchMoreMessages := func(messages []*model_struct.LocalChatLog) bool {
105+
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
106+
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
101107
if len(messages) == 0 {
102-
return false
108+
return count
103109
}
104110

105-
allDeleted := true
111+
// Represents the number of valid messages in the batch
112+
validateMessageNum := 0
106113
for _, msg := range messages {
107114
if msg.Status < constant.MsgStatusHasDeleted {
108-
allDeleted = false
109-
break
115+
validateMessageNum++
116+
validMessages = append(validMessages, msg)
117+
} else {
118+
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
110119
}
111120
}
112-
return allDeleted
121+
return count - validateMessageNum
113122
}
114123
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
115124
if len(messages) == 0 {
@@ -128,39 +137,49 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
128137
return nil, err
129138
}
130139
t = time.Now()
131-
maxSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
140+
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
132141
count, startTime, &list, messageListCallback)
133142
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
134143
t = time.Now()
135-
c.validateAndFillInterBlockGaps(ctx, maxSeq, conversationID,
144+
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
136145
isReverse, count, startTime, &list, messageListCallback)
137146
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
138147
t = time.Now()
139148
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
140149
count, startTime, &list, messageListCallback)
141150
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
142-
// If all retrieved messages are either deleted or filtered out,
143-
//continue fetching recursively until either valid messages are found or all messages have been fetched.
144-
if shouldFetchMoreMessages(list) && !messageListCallback.IsEnd {
145-
return c.fetchMessagesWithGapCheck(ctx, conversationID, count, getNewStartTime(list), isReverse, messageListCallback)
151+
// If the number of valid messages retrieved is less than the count,
152+
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
153+
missingCount := shouldFetchMoreMessagesNum(list)
154+
if missingCount > 0 && !messageListCallback.IsEnd {
155+
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
156+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
157+
if err != nil {
158+
return nil, err
159+
}
160+
log.ZDebug(ctx, "fetch more messages", "missingMessages", missingMessages)
161+
return append(validMessages, missingMessages...), nil
146162
}
147163

148-
return list, nil
164+
return validMessages, nil
149165
}
150166

151-
func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog) (int64, []*sdk_struct.MsgStruct) {
167+
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
152168
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
153-
var thisMinSeq int64
169+
var thisEndSeq int64
154170
for _, v := range list {
155-
if v.Seq != 0 && thisMinSeq == 0 {
156-
thisMinSeq = v.Seq
171+
if v.Seq != 0 && thisEndSeq == 0 {
172+
thisEndSeq = v.Seq
157173
}
158-
if v.Seq < thisMinSeq && v.Seq != 0 {
159-
thisMinSeq = v.Seq
160-
}
161-
if v.Status >= constant.MsgStatusHasDeleted {
162-
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
163-
continue
174+
if isReverse {
175+
if v.Seq > thisEndSeq && thisEndSeq != 0 {
176+
thisEndSeq = v.Seq
177+
}
178+
179+
} else {
180+
if v.Seq < thisEndSeq && v.Seq != 0 {
181+
thisEndSeq = v.Seq
182+
}
164183
}
165184
temp := LocalChatLogToMsgStruct(v)
166185

@@ -169,7 +188,7 @@ func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model
169188
}
170189
messageList = append(messageList, temp)
171190
}
172-
return thisMinSeq, messageList
191+
return thisEndSeq, messageList
173192
}
174193

175194
func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {

internal/conversation_msg/conversation_msg.go

+69-67
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,29 @@ var SearchContentType = []int{constant.Text, constant.AtText, constant.File}
5151

5252
type Conversation struct {
5353
*interaction.LongConnMgr
54-
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
55-
db db_interface.DataBase
56-
ConversationListener func() open_im_sdk_callback.OnConversationListener
57-
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
58-
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
59-
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
60-
recvCH chan common.Cmd2Value
61-
loginUserID string
62-
platformID int32
63-
DataDir string
64-
relation *relation.Relation
65-
group *group.Group
66-
user *user.User
67-
file *file.File
68-
cache *cache.Cache[string, *model_struct.LocalConversation]
69-
maxSeqRecorder MaxSeqRecorder
70-
messagePullMinSeqMap *cache.Cache[string, int64]
71-
IsExternalExtensions bool
72-
msgOffset int
73-
progress int
74-
conversationSyncMutex sync.Mutex
75-
streamMsgMutex sync.Mutex
54+
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
55+
db db_interface.DataBase
56+
ConversationListener func() open_im_sdk_callback.OnConversationListener
57+
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
58+
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
59+
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
60+
recvCH chan common.Cmd2Value
61+
loginUserID string
62+
platformID int32
63+
DataDir string
64+
relation *relation.Relation
65+
group *group.Group
66+
user *user.User
67+
file *file.File
68+
cache *cache.Cache[string, *model_struct.LocalConversation]
69+
maxSeqRecorder MaxSeqRecorder
70+
messagePullForwardEndSeqMap *cache.Cache[string, int64]
71+
messagePullReverseEndSeqMap *cache.Cache[string, int64]
72+
IsExternalExtensions bool
73+
msgOffset int
74+
progress int
75+
conversationSyncMutex sync.Mutex
76+
streamMsgMutex sync.Mutex
7677

7778
startTime time.Time
7879

@@ -96,20 +97,21 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
9697
file *file.File) *Conversation {
9798
info := ccontext.Info(ctx)
9899
n := &Conversation{db: db,
99-
LongConnMgr: longConnMgr,
100-
recvCH: ch,
101-
loginUserID: info.UserID(),
102-
platformID: info.PlatformID(),
103-
DataDir: info.DataDir(),
104-
relation: relation,
105-
group: group,
106-
user: user,
107-
file: file,
108-
IsExternalExtensions: info.IsExternalExtensions(),
109-
maxSeqRecorder: NewMaxSeqRecorder(),
110-
messagePullMinSeqMap: cache.NewCache[string, int64](),
111-
msgOffset: 0,
112-
progress: 0,
100+
LongConnMgr: longConnMgr,
101+
recvCH: ch,
102+
loginUserID: info.UserID(),
103+
platformID: info.PlatformID(),
104+
DataDir: info.DataDir(),
105+
relation: relation,
106+
group: group,
107+
user: user,
108+
file: file,
109+
IsExternalExtensions: info.IsExternalExtensions(),
110+
maxSeqRecorder: NewMaxSeqRecorder(),
111+
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
112+
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
113+
msgOffset: 0,
114+
progress: 0,
113115
}
114116
n.typing = newTyping(n)
115117
n.initSyncer()
@@ -922,37 +924,37 @@ func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversatio
922924
if len(res) == 0 {
923925
return []*sdk_struct.MsgStruct{}, nil
924926
}
925-
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]})
926-
if len(msgList) == 0 {
927-
return []*sdk_struct.MsgStruct{}, nil
928-
}
929-
msg := msgList[0]
927+
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
928+
//if len(msgList) == 0 {
929+
// return []*sdk_struct.MsgStruct{}, nil
930+
//}
931+
//msg := msgList[0]
930932
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
931-
if before > 0 {
932-
req := sdk.GetAdvancedHistoryMessageListParams{
933-
ConversationID: conversationID,
934-
Count: int(before),
935-
StartClientMsgID: msg.ClientMsgID,
936-
}
937-
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
938-
if err != nil {
939-
return nil, err
940-
}
941-
result = append(result, val.MessageList...)
942-
}
943-
result = append(result, msg)
944-
if after > 0 {
945-
req := sdk.GetAdvancedHistoryMessageListParams{
946-
ConversationID: conversationID,
947-
Count: int(after),
948-
StartClientMsgID: msg.ClientMsgID,
949-
}
950-
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
951-
if err != nil {
952-
return nil, err
953-
}
954-
result = append(result, val.MessageList...)
955-
}
956-
sort.Sort(sdk_struct.NewMsgList(result))
933+
//if before > 0 {
934+
// req := sdk.GetAdvancedHistoryMessageListParams{
935+
// ConversationID: conversationID,
936+
// Count: int(before),
937+
// StartClientMsgID: msg.ClientMsgID,
938+
// }
939+
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
940+
// if err != nil {
941+
// return nil, err
942+
// }
943+
// result = append(result, val.MessageList...)
944+
//}
945+
//result = append(result, msg)
946+
//if after > 0 {
947+
// req := sdk.GetAdvancedHistoryMessageListParams{
948+
// ConversationID: conversationID,
949+
// Count: int(after),
950+
// StartClientMsgID: msg.ClientMsgID,
951+
// }
952+
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
953+
// if err != nil {
954+
// return nil, err
955+
// }
956+
// result = append(result, val.MessageList...)
957+
//}
958+
//sort.Sort(sdk_struct.NewMsgList(result))
957959
return result, nil
958960
}

0 commit comments

Comments
 (0)