Skip to content

Commit 1481a2d

Browse files
FGadvancerwithchao
andauthored
fix: modify the historical message retrieval interface to address the message gap problem caused by server crashes or redis seq cache expired. (#857)
* fix: refine exception message handling to prevent duplicate messages in clients with poor network conditions. Signed-off-by: Gordon <[email protected]> * fix: primary key conflicts caused by empty messages occupying seq due to sequence gaps. Signed-off-by: Gordon <[email protected]> * fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same. Signed-off-by: Gordon <[email protected]> * fix: server downtime and abnormal message handling may lead to message duplication, and the history retrieval interface might miss messages when the timestamps are the same. Signed-off-by: Gordon <[email protected]> * fix: add random prefix to remove duplicate messages. Signed-off-by: Gordon <[email protected]> * style: update start message id. Signed-off-by: Gordon <[email protected]> * optimize the freeze caused by too many friends and group applications (#852) * feat: code adjustment * feat: Cmd2Value carry caller * feat: Cmd2Value carry caller * feat: Cmd2Value carry caller * feat: Cmd2Value carry caller * fix: SearchLocalMessages no such table * fix: optimize the freeze caused by too many friends and group applications * fix: GetConversationIDBySessionType returns a string with escape characters. Signed-off-by: Gordon <[email protected]> * fix: modify the historical message retrieval interface to address the message gap problem caused by server crashes or redis seq cache expired. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]> Co-authored-by: chao <[email protected]>
1 parent 792938a commit 1481a2d

File tree

6 files changed

+28
-20
lines changed

6 files changed

+28
-20
lines changed

internal/conversation_msg/conversation.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
5757
var messageListCallback sdk.GetAdvancedHistoryMessageListCallback
5858
var conversationID string
5959
var startClientMsgID string
60-
var startTime int64
60+
var startTime, startSeq int64
6161
var err error
6262
var messageList sdk_struct.NewMsgList
6363
conversationID = req.ConversationID
@@ -68,6 +68,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
6868
}
6969
startTime = m.SendTime
7070
startClientMsgID = req.StartClientMsgID
71+
startSeq = m.Seq
7172
err = c.handleEndSeq(ctx, req, isReverse, m)
7273
if err != nil {
7374
return nil, err
@@ -80,7 +81,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
8081

8182
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
8283
conversationID, "startTime:", startTime, "count:", req.Count)
83-
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
84+
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startSeq, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
8485
if err != nil {
8586
return nil, err
8687
}
@@ -138,7 +139,7 @@ func (c *Conversation) handleEndSeq(ctx context.Context, req sdk.GetAdvancedHist
138139
}
139140

140141
func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
141-
count int, startTime int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
142+
count int, startTime, startSeq int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
142143

143144
var list, validMessages []*model_struct.LocalChatLog
144145

@@ -200,16 +201,16 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
200201

201202
return count - validateMessageNum
202203
}
203-
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, string) {
204+
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, int64, string) {
204205
if len(messages) == 0 {
205-
return 0, ""
206+
return 0, 0, ""
206207
}
207208
// Returns the SendTime and ClientMsgID of the last element in the message list
208-
return messages[len(messages)-1].SendTime, messages[len(messages)-1].ClientMsgID
209+
return messages[len(messages)-1].SendTime, messages[len(messages)-1].Seq, messages[len(messages)-1].ClientMsgID
209210
}
210211

211212
t := time.Now()
212-
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startClientMsgID, isReverse)
213+
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startSeq, startClientMsgID, isReverse)
213214
log.ZDebug(ctx, "db get messageList", "cost time", time.Since(t), "len", len(list), "err",
214215
err, "conversationID", conversationID)
215216

@@ -232,10 +233,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
232233
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
233234
missingCount := shouldFetchMoreMessagesNum(list)
234235
if missingCount > 0 && !messageListCallback.IsEnd {
235-
newStartTime, newStartClientMsgID := getNewStartMessageInfo(list)
236+
newStartTime, newStartSeq, newStartClientMsgID := getNewStartMessageInfo(list)
236237
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
237-
conversationID, "newStartTime", newStartTime)
238-
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartClientMsgID, isReverse, viewType, messageListCallback)
238+
conversationID, "newStartTime", newStartTime, "newStartSeq", newStartSeq, "newStartClientMsgID", newStartClientMsgID)
239+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartSeq, newStartClientMsgID, isReverse, viewType, messageListCallback)
239240
if err != nil {
240241
return nil, err
241242
}

internal/conversation_msg/revoke.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (c *Conversation) revokeMessage(ctx context.Context, tips *sdkws.RevokeMsgT
108108
log.ZDebug(ctx, "latestMsg", "latestMsg", &latestMsg, "seq", tips.Seq)
109109
if latestMsg.Seq <= tips.Seq {
110110
var newLatestMsg sdk_struct.MsgStruct
111-
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, "", false)
111+
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, 0, "", false)
112112
if err != nil || len(msgs) == 0 {
113113
log.ZError(ctx, "GetMessageListNoTime failed", err, "tips", &tips)
114114
return errs.Wrap(err)

pkg/db/chat_log_model.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (d *DataBase) UpdateMessageTimeAndStatus(ctx context.Context, conversationI
154154
Updates(model_struct.LocalChatLog{Status: status, SendTime: sendTime, ServerMsgID: serverMsgID}).Error, "UpdateMessageStatusBySourceID failed")
155155
}
156156

157-
func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
157+
func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
158158
if err = d.initChatLog(ctx, conversationID); err != nil {
159159
log.ZWarn(ctx, "initChatLog err", err)
160160
return nil, err
@@ -164,14 +164,16 @@ func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, co
164164
var condition, timeOrder, timeSymbol string
165165
if isReverse {
166166
timeOrder = "send_time ASC,seq ASC"
167-
timeSymbol = ">="
167+
timeSymbol = ">"
168168
} else {
169169
timeOrder = "send_time DESC,seq DESC"
170-
timeSymbol = "<="
170+
timeSymbol = "<"
171171
}
172172
if startTime > 0 {
173-
condition = "send_time " + timeSymbol + " ? AND client_msg_id != ?"
174-
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, startTime, startClientMsgID).
173+
condition = "send_time " + timeSymbol + " ? " +
174+
"OR (send_time = ? AND (seq " + timeSymbol + " ? OR (seq = 0 AND client_msg_id != ?)))"
175+
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).
176+
Where(condition, startTime, startTime, startSeq, startClientMsgID).
175177
Order(timeOrder).Offset(0).Limit(count).Find(&result).Error, "GetMessageList failed")
176178
if err != nil {
177179
return nil, err

pkg/db/db_interface/databse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type MessageModel interface {
6767
UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
6868
UpdateMessageBySeq(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
6969
UpdateMessageTimeAndStatus(ctx context.Context, conversationID, clientMsgID string, serverMsgID string, sendTime int64, status int32) error
70-
GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error)
70+
GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error)
7171
MarkConversationMessageAsReadDB(ctx context.Context, conversationID string, msgIDs []string) (rowsAffected int64, err error)
7272
MarkConversationMessageAsReadBySeqs(ctx context.Context, conversationID string, seqs []int64) (rowsAffected int64, err error)
7373
GetUnreadMessage(ctx context.Context, conversationID string) (result []*model_struct.LocalChatLog, err error)

test/conversation_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,12 @@ func Test_FindMessageList(t *testing.T) {
136136
}
137137

138138
func Test_GetAdvancedHistoryMessageList(t *testing.T) {
139-
msgs, err := open_im_sdk.UserForSDK.Conversation().GetAdvancedHistoryMessageList(ctx, sdk_params_callback.GetAdvancedHistoryMessageListParams{})
139+
msgs, err := open_im_sdk.UserForSDK.Conversation().GetAdvancedHistoryMessageList(ctx, sdk_params_callback.GetAdvancedHistoryMessageListParams{
140+
ConversationID: "si_5318543822_9511766539",
141+
StartClientMsgID: "",
142+
Count: 40,
143+
ViewType: 0,
144+
})
140145
if err != nil {
141146
t.Fatal(err)
142147
}

wasm/indexdb/chat_log_model.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ func (i *LocalChatLogs) UpdateMessageTimeAndStatus(ctx context.Context, conversa
126126
}
127127

128128
// GetMessageList retrieves a list of messages from the local chat log.
129-
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
130-
msgList, err := exec.Exec(conversationID, count, startTime, startClientMsgID, isReverse, i.loginUserID)
129+
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
130+
msgList, err := exec.Exec(conversationID, count, startTime, startSeq, startClientMsgID, isReverse, i.loginUserID)
131131
if err != nil {
132132
return nil, err
133133
} else {

0 commit comments

Comments
 (0)