Skip to content

Commit

Permalink
fix: server downtime and abnormal message handling may lead to messag…
Browse files Browse the repository at this point in the history
…e duplication, and the history retrieval interface might miss messages when the timestamps are the same. (#845)

* fix: refine exception message handling to prevent duplicate messages in clients with poor network conditions.

Signed-off-by: Gordon <[email protected]>

* fix: primary key conflicts caused by empty messages occupying seq due to sequence gaps.

Signed-off-by: Gordon <[email protected]>

* fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same.

Signed-off-by: Gordon <[email protected]>

* fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same.

Signed-off-by: Gordon <[email protected]>

---------

Signed-off-by: Gordon <[email protected]>
  • Loading branch information
FGadvancer authored Feb 5, 2025
1 parent d1323b9 commit 8e5f68c
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 35 deletions.
27 changes: 14 additions & 13 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
t := time.Now()
var messageListCallback sdk.GetAdvancedHistoryMessageListCallback
var conversationID string
var startClientMsgID string
var startTime int64
var err error
var messageList sdk_struct.NewMsgList
Expand All @@ -75,8 +76,8 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}

log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback)
conversationID, "startTime:", startTime, "count:", req.Count)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,7 +135,7 @@ func (c *Conversation) handleEndSeq(ctx context.Context, req sdk.GetAdvancedHist
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
count int, startTime int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list, validMessages []*model_struct.LocalChatLog

Expand Down Expand Up @@ -196,16 +197,16 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati

return count - validateMessageNum
}
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, string) {
if len(messages) == 0 {
return 0
return 0, ""
}
// Returns the SendTime of the last element in the message list
return messages[len(messages)-1].SendTime
// Returns the SendTime and ClientMsgID of the last element in the message list
return messages[len(messages)-1].SendTime, messages[len(messages)-1].ClientMsgID
}

t := time.Now()
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, isReverse)
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startClientMsgID, isReverse)
log.ZDebug(ctx, "db get messageList", "cost time", time.Since(t), "len", len(list), "err",
err, "conversationID", conversationID)

Expand All @@ -215,23 +216,23 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
t = time.Now()
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
log.ZDebug(ctx, "internal continuity check over", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, viewType, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
log.ZDebug(ctx, "between continuity check over", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
log.ZDebug(ctx, "end continuity check over", "cost time", time.Since(t))
// If the number of valid messages retrieved is less than the count,
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
missingCount := shouldFetchMoreMessagesNum(list)
if missingCount > 0 && !messageListCallback.IsEnd {
newStartTime := getNewStartTime(list)
newStartTime, newStartClientMsgID := getNewStartMessageInfo(list)
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
conversationID, "newStartTime", newStartTime)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartClientMsgID, isReverse, viewType, messageListCallback)
if err != nil {
return nil, err
}
Expand Down
33 changes: 29 additions & 4 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/utils/datautil"

Expand Down Expand Up @@ -234,7 +235,7 @@ func (c *Conversation) fetchAndMergeMissingMessages(ctx context.Context, convers
return
}
if v, ok := getSeqMessageResp.Msgs[conversationID]; ok {
c.pullMessageIntoTable(ctx, getSeqMessageResp.Msgs)
c.pullMessageIntoTable(ctx, getSeqMessageResp.Msgs, list)
log.ZDebug(ctx, "syncMsgFromServerSplit pull msg success",
"conversationID", conversationID, "count", count, "len", len(*list), "msgLen", len(v.Msgs))
if v.IsEnd {
Expand Down Expand Up @@ -325,7 +326,10 @@ func mergeSortedArrays(arr1, arr2 []*model_struct.LocalChatLog, n int, isDescend
i, j := 0, 0

for i < len1 && j < len2 && len(result) < n {
if (isDescending && arr1[i].SendTime >= arr2[j].SendTime) || (!isDescending && arr1[i].SendTime <= arr2[j].SendTime) {
//In descending order, when pulling forward, sort by sendTime. If sendTime is the same, sort by seq.
//In ascending order, when pulling backward, sort by sendTime. If sendTime is the same, sort by seq.
if (isDescending && (arr1[i].SendTime > arr2[j].SendTime || (arr1[i].SendTime == arr2[j].SendTime && arr1[i].Seq > arr2[j].Seq))) ||
(!isDescending && (arr1[i].SendTime < arr2[j].SendTime || (arr1[i].SendTime == arr2[j].SendTime && arr1[i].Seq < arr2[j].Seq))) {
result = append(result, arr1[i])
i++
} else {
Expand Down Expand Up @@ -369,7 +373,10 @@ func (c *Conversation) handleExceptionMessages(ctx context.Context, existingMess
if message.Status == constant.MsgStatusHasDeleted {
// If ClientMsgID is empty, it's a placeholder for seq gap
if message.ClientMsgID == "" {
prefix = "[SEQ_GAP]" // Placeholder for sequence gap
// Gap messages are typically caused by server downtime or prolonged periods of inactivity.
// These messages usually lack a message ID, so a message ID needs to be generated to prevent primary key conflicts.
message.ClientMsgID = utils.GetMsgID(c.loginUserID)
prefix = "[SEQ_GAP_+" + utils.Int64ToString(message.Seq) + "]" // Placeholder for sequence gap
} else {
prefix = "[DELETED]" // Mark as a deleted message
}
Expand All @@ -395,7 +402,7 @@ func (c *Conversation) handleExceptionMessages(ctx context.Context, existingMess
message.ClientMsgID = prefix + message.ClientMsgID
}

func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map[string]*sdkws.PullMsgs) {
func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map[string]*sdkws.PullMsgs, list *[]*model_struct.LocalChatLog) {
insertMsg := make(map[string][]*model_struct.LocalChatLog, 20)
updateMsg := make(map[string][]*model_struct.LocalChatLog, 30)
var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog
Expand All @@ -419,6 +426,7 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
msg := MsgDataToLocalChatLog(v)
if v.Status == constant.MsgStatusHasDeleted {
c.handleExceptionMessages(ctx, nil, msg)
v.ClientMsgID = msg.ClientMsgID
exceptionMsg = append(exceptionMsg, msg)
insertMessage = append(insertMessage, msg)
continue
Expand All @@ -429,6 +437,8 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
if exists {
log.ZDebug(ctx, "have message", "msg", msg)
if existingMsg.Seq == 0 {
//If the message sent by the user hasn't synchronized the seq to the local storage in time,
//during the next sync, there will be local messages with seq as 0. These messages need to be updated with the correct seq and deduplicated.
updateMessage = append(updateMessage, msg)

} else {
Expand Down Expand Up @@ -466,6 +476,21 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
if err6 := c.batchUpdateMessageList(ctx, updateMsg); err6 != nil {
log.ZError(ctx, "sync seq normal message err :", err6)
}
if len(updateMessage) > 0 {
updateMessageMap := datautil.SliceToMap(updateMessage, func(message *model_struct.LocalChatLog) string {
return message.ClientMsgID
})

filteredList := make([]*model_struct.LocalChatLog, 0, len(*list))
for _, v := range *list {
if _, ok := updateMessageMap[v.ClientMsgID]; !ok {
filteredList = append(filteredList, v)
}
}

*list = filteredList
}

timeNow = time.Now()
//Normal message storage
_ = c.batchInsertMessageList(ctx, insertMsg)
Expand Down
104 changes: 97 additions & 7 deletions internal/conversation_msg/message_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,109 @@ func TestMergeSortedArrays(t *testing.T) {
{SendTime: 7, Content: "Message 7"},
},
},
{
// Test merging a descending array and an ascending array
arr1: []*model_struct.LocalChatLog{
{SendTime: 0, Content: "Message 54", Seq: 54},
{SendTime: 0, Content: "Message 53", Seq: 53},
{SendTime: 0, Content: "Message 52", Seq: 52},
{SendTime: 0, Content: "Message 4", Seq: 4},
},
arr2: []*model_struct.LocalChatLog{
{SendTime: 0, Content: "Message 51", Seq: 51},
{SendTime: 0, Content: "Message 50", Seq: 50},
{SendTime: 0, Content: "Message 49", Seq: 49},
{SendTime: 0, Content: "Message 48", Seq: 48},
{SendTime: 0, Content: "Message 47", Seq: 47},
{SendTime: 0, Content: "Message 46", Seq: 46},
{SendTime: 0, Content: "Message 45", Seq: 45},
{SendTime: 0, Content: "Message 44", Seq: 44},
{SendTime: 0, Content: "Message 43", Seq: 43},
{SendTime: 0, Content: "Message 42", Seq: 42},
{SendTime: 0, Content: "Message 41", Seq: 41},
{SendTime: 0, Content: "Message 40", Seq: 40},
{SendTime: 0, Content: "Message 39", Seq: 39},
{SendTime: 0, Content: "Message 38", Seq: 38},
{SendTime: 0, Content: "Message 37", Seq: 37},
{SendTime: 0, Content: "Message 36", Seq: 36},
{SendTime: 0, Content: "Message 35", Seq: 35},
{SendTime: 0, Content: "Message 34", Seq: 34},
{SendTime: 0, Content: "Message 33", Seq: 33},
{SendTime: 0, Content: "Message 32", Seq: 32},
{SendTime: 0, Content: "Message 31", Seq: 31},
{SendTime: 0, Content: "Message 30", Seq: 30},
{SendTime: 0, Content: "Message 29", Seq: 29},
{SendTime: 0, Content: "Message 28", Seq: 28},
{SendTime: 0, Content: "Message 27", Seq: 27},
{SendTime: 0, Content: "Message 26", Seq: 26},
{SendTime: 0, Content: "Message 25", Seq: 25},
{SendTime: 0, Content: "Message 24", Seq: 24},
{SendTime: 0, Content: "Message 23", Seq: 23},
{SendTime: 0, Content: "Message 22", Seq: 22},
{SendTime: 0, Content: "Message 21", Seq: 21},
{SendTime: 0, Content: "Message 20", Seq: 20},
{SendTime: 0, Content: "Message 19", Seq: 19},
{SendTime: 0, Content: "Message 18", Seq: 18},
{SendTime: 0, Content: "Message 17", Seq: 17},
{SendTime: 0, Content: "Message 16", Seq: 16},
{SendTime: 0, Content: "Message 15", Seq: 15},
{SendTime: 0, Content: "Message 14", Seq: 14},
{SendTime: 0, Content: "Message 13", Seq: 13},
{SendTime: 0, Content: "Message 12", Seq: 12},
{SendTime: 0, Content: "Message 11", Seq: 11},
{SendTime: 0, Content: "Message 10", Seq: 10},
{SendTime: 0, Content: "Message 9", Seq: 9},
{SendTime: 0, Content: "Message 8", Seq: 8},
{SendTime: 0, Content: "Message 7", Seq: 7},
{SendTime: 0, Content: "Message 6", Seq: 6},
{SendTime: 0, Content: "Message 5", Seq: 5},
},
n: 20, // Limit result to first 5 elements
isDescending: true,
// Expected result: merged in descending order
expected: []*model_struct.LocalChatLog{
{SendTime: 0, Content: "Message 54", Seq: 54},
{SendTime: 0, Content: "Message 53", Seq: 53},
{SendTime: 0, Content: "Message 52", Seq: 52},
{SendTime: 0, Content: "Message 51", Seq: 51},
{SendTime: 0, Content: "Message 50", Seq: 50},
{SendTime: 0, Content: "Message 49", Seq: 49},
{SendTime: 0, Content: "Message 48", Seq: 48},
{SendTime: 0, Content: "Message 47", Seq: 47},
{SendTime: 0, Content: "Message 46", Seq: 46},
{SendTime: 0, Content: "Message 45", Seq: 45},
{SendTime: 0, Content: "Message 44", Seq: 44},
{SendTime: 0, Content: "Message 43", Seq: 43},
{SendTime: 0, Content: "Message 42", Seq: 42},
{SendTime: 0, Content: "Message 41", Seq: 41},
{SendTime: 0, Content: "Message 40", Seq: 40},
{SendTime: 0, Content: "Message 39", Seq: 39},
{SendTime: 0, Content: "Message 38", Seq: 38},
{SendTime: 0, Content: "Message 37", Seq: 37},
{SendTime: 0, Content: "Message 36", Seq: 36},
{SendTime: 0, Content: "Message 35", Seq: 35},
},
},
}

for _, tt := range tests {
result := mergeSortedArrays(tt.arr1, tt.arr2, tt.n, tt.isDescending)
if !reflect.DeepEqual(result, tt.expected) {
t.Errorf(
"mergeSortedArrays(%v, %v, %d) = %v; want %v",
extractSendTimes(tt.arr1),
extractSendTimes(tt.arr2),
extractSendSeqs(tt.arr1),
extractSendSeqs(tt.arr2),
tt.n,
extractSendTimes(result),
extractSendTimes(tt.expected),
extractSendSeqs(result),
extractSendSeqs(tt.expected),
)
} else {
fmt.Printf(
"PASS: mergeSortedArrays(%v, %v, %d) = %v\n",
extractSendTimes(tt.arr1),
extractSendTimes(tt.arr2),
extractSendSeqs(tt.arr1),
extractSendSeqs(tt.arr2),
tt.n,
extractSendTimes(result),
extractSendSeqs(result),
)
}
}
Expand Down Expand Up @@ -166,3 +249,10 @@ func extractSendTimes(arr []*model_struct.LocalChatLog) []int64 {
}
return sendTimes
}
func extractSendSeqs(arr []*model_struct.LocalChatLog) []int64 {
sendTimes := make([]int64, len(arr))
for i, log := range arr {
sendTimes[i] = log.Seq
}
return sendTimes
}
2 changes: 1 addition & 1 deletion internal/conversation_msg/revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *Conversation) revokeMessage(ctx context.Context, tips *sdkws.RevokeMsgT
log.ZDebug(ctx, "latestMsg", "latestMsg", &latestMsg, "seq", tips.Seq)
if latestMsg.Seq <= tips.Seq {
var newLatestMsg sdk_struct.MsgStruct
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, false)
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, "", false)
if err != nil || len(msgs) == 0 {
log.ZError(ctx, "GetMessageListNoTime failed", err, "tips", &tips)
return errs.Wrap(err)
Expand Down
14 changes: 7 additions & 7 deletions pkg/db/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *DataBase) UpdateMessageTimeAndStatus(ctx context.Context, conversationI
Updates(model_struct.LocalChatLog{Status: status, SendTime: sendTime, ServerMsgID: serverMsgID}).Error, "UpdateMessageStatusBySourceID failed")
}

func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
if err = d.initChatLog(ctx, conversationID); err != nil {
log.ZWarn(ctx, "initChatLog err", err)
return nil, err
Expand All @@ -164,15 +164,15 @@ func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, co
defer d.mRWMutex.RUnlock()
var condition, timeOrder, timeSymbol string
if isReverse {
timeOrder = "send_time ASC"
timeSymbol = ">"
timeOrder = "send_time ASC,seq ASC"
timeSymbol = ">="
} else {
timeOrder = "send_time DESC"
timeSymbol = "<"
timeOrder = "send_time DESC,seq DESC"
timeSymbol = "<="
}
if startTime > 0 {
condition = "send_time " + timeSymbol + " ?"
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, startTime).
condition = "send_time " + timeSymbol + " ? AND client_msg_id != ?"
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, startTime, startClientMsgID).
Order(timeOrder).Offset(0).Limit(count).Find(&result).Error, "GetMessageList failed")
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type MessageModel interface {
UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
UpdateMessageBySeq(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
UpdateMessageTimeAndStatus(ctx context.Context, conversationID, clientMsgID string, serverMsgID string, sendTime int64, status int32) error
GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, isReverse bool) (result []*model_struct.LocalChatLog, err error)
GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error)
MarkConversationMessageAsReadDB(ctx context.Context, conversationID string, msgIDs []string) (rowsAffected int64, err error)
MarkConversationMessageAsReadBySeqs(ctx context.Context, conversationID string, seqs []int64) (rowsAffected int64, err error)
GetUnreadMessage(ctx context.Context, conversationID string) (result []*model_struct.LocalChatLog, err error)
Expand Down
4 changes: 2 additions & 2 deletions wasm/indexdb/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (i *LocalChatLogs) UpdateMessageTimeAndStatus(ctx context.Context, conversa
}

// GetMessageList retrieves a list of messages from the local chat log.
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
msgList, err := exec.Exec(conversationID, count, startTime, isReverse, i.loginUserID)
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
msgList, err := exec.Exec(conversationID, count, startTime, startClientMsgID, isReverse, i.loginUserID)
if err != nil {
return nil, err
} else {
Expand Down

0 comments on commit 8e5f68c

Please sign in to comment.