Skip to content

Commit 810f398

Browse files
authored
fix: refine exception message handling to prevent duplicate messages … (#841)
* fix: refine exception message handling to prevent duplicate messages in clients with poor network conditions. Signed-off-by: Gordon <[email protected]> * fix: primary key conflicts caused by empty messages occupying seq due to sequence gaps. Signed-off-by: Gordon <[email protected]> * fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same. Signed-off-by: Gordon <[email protected]> * fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same. Signed-off-by: Gordon <[email protected]> * fix: add random prefix to remove duplicate messages. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent e77655b commit 810f398

18 files changed

+244
-968
lines changed

internal/conversation_msg/api.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,6 @@ func (c *Conversation) SetConversationListener(listener func() open_im_sdk_callb
146146
c.ConversationListener = listener
147147
}
148148

149-
func (c *Conversation) msgDataToLocalErrChatLog(src *model_struct.LocalChatLog) *model_struct.LocalErrChatLog {
150-
var lc model_struct.LocalErrChatLog
151-
copier.Copy(&lc, src)
152-
return &lc
153-
154-
}
155-
156149
func (c *Conversation) updateMsgStatusAndTriggerConversation(ctx context.Context, clientMsgID, serverMsgID string, sendTime int64, status int32, s *sdk_struct.MsgStruct,
157150
lc *model_struct.LocalConversation, isOnlineOnly bool) {
158151
log.ZDebug(ctx, "this is test send message ", "sendTime", sendTime, "status", status, "clientMsgID", clientMsgID, "serverMsgID", serverMsgID)

internal/conversation_msg/conversation.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
5656
t := time.Now()
5757
var messageListCallback sdk.GetAdvancedHistoryMessageListCallback
5858
var conversationID string
59+
var startClientMsgID string
5960
var startTime int64
6061
var err error
6162
var messageList sdk_struct.NewMsgList
@@ -77,8 +78,8 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
7778
}
7879

7980
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
80-
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
81-
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback)
81+
conversationID, "startTime:", startTime, "count:", req.Count)
82+
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
8283
if err != nil {
8384
return nil, err
8485
}
@@ -136,7 +137,7 @@ func (c *Conversation) handleEndSeq(ctx context.Context, req sdk.GetAdvancedHist
136137
}
137138

138139
func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
139-
count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
140+
count int, startTime int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
140141

141142
var list, validMessages []*model_struct.LocalChatLog
142143

@@ -198,16 +199,16 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
198199

199200
return count - validateMessageNum
200201
}
201-
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
202+
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, string) {
202203
if len(messages) == 0 {
203-
return 0
204+
return 0, ""
204205
}
205-
// Returns the SendTime of the last element in the message list
206-
return messages[len(messages)-1].SendTime
206+
// Returns the SendTime and ClientMsgID of the last element in the message list
207+
return messages[len(messages)-1].SendTime, messages[len(messages)-1].ClientMsgID
207208
}
208209

209210
t := time.Now()
210-
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, isReverse)
211+
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startClientMsgID, isReverse)
211212
log.ZDebug(ctx, "db get messageList", "cost time", time.Since(t), "len", len(list), "err",
212213
err, "conversationID", conversationID)
213214

@@ -217,23 +218,23 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
217218
t = time.Now()
218219
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
219220
count, startTime, &list, messageListCallback)
220-
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
221+
log.ZDebug(ctx, "internal continuity check over", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
221222
t = time.Now()
222223
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
223224
isReverse, viewType, count, startTime, &list, messageListCallback)
224-
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
225+
log.ZDebug(ctx, "between continuity check over", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
225226
t = time.Now()
226227
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType,
227228
count, startTime, &list, messageListCallback)
228-
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
229+
log.ZDebug(ctx, "end continuity check over", "cost time", time.Since(t))
229230
// If the number of valid messages retrieved is less than the count,
230231
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
231232
missingCount := shouldFetchMoreMessagesNum(list)
232233
if missingCount > 0 && !messageListCallback.IsEnd {
233-
newStartTime := getNewStartTime(list)
234+
newStartTime, newStartClientMsgID := getNewStartMessageInfo(list)
234235
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
235236
conversationID, "newStartTime", newStartTime)
236-
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback)
237+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartClientMsgID, isReverse, viewType, messageListCallback)
237238
if err != nil {
238239
return nil, err
239240
}

internal/conversation_msg/conversation_msg.go

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,8 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
207207
var isTriggerUnReadCount bool
208208
insertMsg := make(map[string][]*model_struct.LocalChatLog, 10)
209209
updateMsg := make(map[string][]*model_struct.LocalChatLog, 10)
210-
var exceptionMsg []*model_struct.LocalErrChatLog
211-
//var unreadMessages []*model_struct.LocalConversationUnreadMessage
210+
var exceptionMsg []*model_struct.LocalChatLog
212211
var newMessages sdk_struct.NewMsgList
213-
// var reactionMsgModifierList, reactionMsgDeleterList sdk_struct.NewMsgList
214212

215213
var isUnreadCount, isConversationUpdate, isHistory, isNotPrivate, isSenderConversationUpdate bool
216214

@@ -253,7 +251,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
253251

254252
//When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
255253
if msg.Status == constant.MsgStatusHasDeleted {
256-
insertMessage = append(insertMessage, MsgStructToLocalChatLog(msg))
254+
dbMessage := MsgStructToLocalChatLog(msg)
255+
c.handleExceptionMessages(ctx, nil, dbMessage)
256+
exceptionMsg = append(exceptionMsg, dbMessage)
257+
insertMessage = append(insertMessage, dbMessage)
257258
continue
258259
}
259260

@@ -269,10 +270,6 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
269270
if !isNotPrivate {
270271
msg.AttachedInfoElem.IsPrivateChat = true
271272
}
272-
if msg.ClientMsgID == "" {
273-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
274-
continue
275-
}
276273
if conversationID == "" {
277274
log.ZError(ctx, "conversationID is empty", errors.New("conversationID is empty"), "msg", msg)
278275
continue
@@ -285,16 +282,19 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
285282
log.ZDebug(ctx, "decode message", "msg", msg)
286283
if v.SendID == c.loginUserID { //seq
287284
// Messages sent by myself //if sent through this terminal
288-
m, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID)
285+
existingMsg, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID)
289286
if err == nil {
290287
log.ZInfo(ctx, "have message", "msg", msg)
291-
if m.Seq == 0 {
288+
if existingMsg.Seq == 0 {
292289
if !isConversationUpdate {
293290
msg.Status = constant.MsgStatusFiltered
294291
}
295292
updateMessage = append(updateMessage, MsgStructToLocalChatLog(msg))
296293
} else {
297-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
294+
dbMessage := MsgStructToLocalChatLog(msg)
295+
c.handleExceptionMessages(ctx, existingMsg, dbMessage)
296+
insertMessage = append(insertMessage, dbMessage)
297+
exceptionMsg = append(exceptionMsg, dbMessage)
298298
}
299299
} else {
300300
log.ZInfo(ctx, "sync message", "msg", msg)
@@ -322,7 +322,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
322322
}
323323
}
324324
} else { //Sent by others
325-
if _, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation
325+
if existingMsg, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil {
326326
lc := model_struct.LocalConversation{
327327
ConversationType: v.SessionType,
328328
LatestMsg: utils.StructToJsonString(msg),
@@ -356,11 +356,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
356356
}
357357

358358
} else {
359-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
360-
log.ZWarn(ctx, "Deduplication operation ", nil, "msg", *c.msgStructToLocalErrChatLog(msg))
361-
msg.Status = constant.MsgStatusFiltered
362-
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
363-
othersInsertMessage = append(othersInsertMessage, MsgStructToLocalChatLog(msg))
359+
dbMessage := MsgStructToLocalChatLog(msg)
360+
c.handleExceptionMessages(ctx, existingMsg, dbMessage)
361+
insertMessage = append(insertMessage, dbMessage)
362+
exceptionMsg = append(exceptionMsg, dbMessage)
364363
}
365364
}
366365
}
@@ -458,6 +457,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
458457
}
459458
}
460459
}
460+
//Exception message storage
461+
for _, v := range exceptionMsg {
462+
log.ZWarn(ctx, "exceptionMsg show: ", nil, "msg", *v)
463+
}
461464

462465
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
463466
}
@@ -471,6 +474,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
471474

472475
insertMsg := make(map[string][]*model_struct.LocalChatLog, 10)
473476
conversationList := make([]*model_struct.LocalConversation, 0)
477+
var exceptionMsg []*model_struct.LocalChatLog
474478

475479
log.ZDebug(ctx, "message come here conversation ch in reinstalled", "conversation length", msgLen)
476480
b := time.Now()
@@ -497,7 +501,10 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
497501

498502
//When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
499503
if msg.Status == constant.MsgStatusHasDeleted {
500-
insertMessage = append(insertMessage, MsgStructToLocalChatLog(msg))
504+
dbMessage := MsgStructToLocalChatLog(msg)
505+
c.handleExceptionMessages(ctx, nil, dbMessage)
506+
exceptionMsg = append(exceptionMsg, dbMessage)
507+
insertMessage = append(insertMessage, dbMessage)
501508
continue
502509
}
503510
msg.Status = constant.MsgStatusSendSuccess
@@ -552,6 +559,10 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
552559

553560
// log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
554561
c.ConversationListener().OnSyncServerProgress((c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
562+
//Exception message storage
563+
for _, v := range exceptionMsg {
564+
log.ZWarn(ctx, "exceptionMsg show: ", nil, "msg", *v)
565+
}
555566
}
556567

557568
func (c *Conversation) addInitProgress(progress int) {
@@ -613,15 +624,6 @@ func (c *Conversation) genConversationGroupAtType(lc *model_struct.LocalConversa
613624
}
614625
}
615626

616-
func (c *Conversation) msgStructToLocalErrChatLog(m *sdk_struct.MsgStruct) *model_struct.LocalErrChatLog {
617-
var lc model_struct.LocalErrChatLog
618-
copier.Copy(&lc, m)
619-
if m.SessionType == constant.WriteGroupChatType || m.SessionType == constant.ReadGroupChatType {
620-
lc.RecvID = m.GroupID
621-
}
622-
return &lc
623-
}
624-
625627
func (c *Conversation) batchUpdateMessageList(ctx context.Context, updateMsg map[string][]*model_struct.LocalChatLog) error {
626628
if updateMsg == nil {
627629
return nil
@@ -678,7 +680,7 @@ func (c *Conversation) batchInsertMessageList(ctx context.Context, insertMsg map
678680
}
679681
err := c.db.BatchInsertMessageList(ctx, conversationID, messages)
680682
if err != nil {
681-
log.ZError(ctx, "insert GetMessage detail err:", err, "conversationID", conversationID, "messages", messages)
683+
log.ZError(ctx, "BatchInsertMessageList detail err:", err, "conversationID", conversationID, "messages", messages)
682684
for _, v := range messages {
683685
e := c.db.InsertMessage(ctx, conversationID, v)
684686
if e != nil {

0 commit comments

Comments
 (0)