Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a function to quickly retrieve the context messages for a given message. #827

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require golang.org/x/net v0.29.0 // indirect

require (
github.com/google/go-cmp v0.6.0
github.com/openimsdk/protocol v0.0.72-alpha.63
github.com/openimsdk/protocol v0.0.72-alpha.70
github.com/openimsdk/tools v0.0.50-alpha.21
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/image v0.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY=
github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand Down
59 changes: 59 additions & 0 deletions internal/conversation_msg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
pconstant "github.com/openimsdk/protocol/constant"

"github.com/openimsdk/tools/errs"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/content_type"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/server_api_params"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
Expand Down Expand Up @@ -1014,3 +1016,60 @@ func (c *Conversation) SearchConversation(ctx context.Context, searchParam strin
// Return the list of conversations
return apiConversations, nil
}
func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
return c.typing.GetInputStates(conversationID, userID), nil
}

func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, s *sdk_struct.MsgStruct, before int, after int) ([]*sdk_struct.MsgStruct, error) {
conversationID := utils.GetConversationIDByMsg(s)
var message *model_struct.LocalChatLog
message, err := c.db.GetMessage(ctx, conversationID, s.ClientMsgID)
if err == nil {
if message.Status >= constant.MsgStatusHasDeleted {
return nil, sdkerrs.ErrMsgHasDeleted
}
} else {
if s.Seq == 0 {
return nil, sdkerrs.ErrMsgHasDeleted
}
var messages []*model_struct.LocalChatLog
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{s.Seq}, false, 1, 0, &messages, &sdk.GetAdvancedHistoryMessageListCallback{})
if len(messages) < 1 {
return nil, sdkerrs.ErrMsgHasDeleted
}
message = messages[0]
}

result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: before,
StartClientMsgID: s.ClientMsgID,
ViewType: cache.ViewSearch,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, LocalChatLogToMsgStruct(message))
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: after,
StartClientMsgID: s.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
return result, nil
}
22 changes: 11 additions & 11 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
startTime = m.SendTime
} else {
// Clear both maps when the user enters the conversation
c.messagePullForwardEndSeqMap.Delete(conversationID)
c.messagePullReverseEndSeqMap.Delete(conversationID)
c.messagePullForwardEndSeqMap.Delete(conversationID, req.ViewType)
c.messagePullReverseEndSeqMap.Delete(conversationID, req.ViewType)
}
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, &messageListCallback)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +91,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list, validMessages []*model_struct.LocalChatLog

Expand Down Expand Up @@ -125,8 +125,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
}
if !isReverse {
if thisEndSeq != 0 {
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key)
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key, viewType)
if value < lastEndSeq || lastEndSeq == 0 {
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
return true
Expand All @@ -138,8 +138,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
}
} else {
if thisEndSeq != 0 {
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key)
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key, viewType)
if value > lastEndSeq || lastEndSeq == 0 {
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
return true
Expand Down Expand Up @@ -175,10 +175,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, count, startTime, &list, messageListCallback)
isReverse, viewType, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
// If the number of valid messages retrieved is less than the count,
Expand All @@ -188,7 +188,7 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
newStartTime := getNewStartTime(list)
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
conversationID, "newStartTime", newStartTime)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback)
if err != nil {
return nil, err
}
Expand Down
62 changes: 4 additions & 58 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"math"
"sync"

sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"

"github.com/openimsdk/openim-sdk-core/v3/pkg/api"
"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
"github.com/openimsdk/tools/utils/stringutil"
Expand Down Expand Up @@ -67,8 +65,8 @@ type Conversation struct {
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullForwardEndSeqMap *cache.Cache[string, int64]
messagePullReverseEndSeqMap *cache.Cache[string, int64]
messagePullForwardEndSeqMap *cache.ConversationSeqContextCache
messagePullReverseEndSeqMap *cache.ConversationSeqContextCache
IsExternalExtensions bool
msgOffset int
progress int
Expand Down Expand Up @@ -108,8 +106,8 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
messagePullForwardEndSeqMap: cache.NewConversationSeqContextCache(),
messagePullReverseEndSeqMap: cache.NewConversationSeqContextCache(),
msgOffset: 0,
progress: 0,
}
Expand Down Expand Up @@ -906,55 +904,3 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string)
}
return userInfo.FaceURL, userInfo.Nickname, nil
}

func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
return c.typing.GetInputStates(conversationID, userID), nil
}

func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) {
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{seq}, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{})
res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq})
if err != nil {
return nil, err
}
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
//if len(msgList) == 0 {
// return []*sdk_struct.MsgStruct{}, nil
//}
//msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
//if before > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(before),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//result = append(result, msg)
//if after > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(after),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
18 changes: 9 additions & 9 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func (c *Conversation) validateAndFillInternalGaps(ctx context.Context, conversa
// validateAndFillInterBlockGaps checks for continuity between blocks of messages. If a gap is identified, it retrieves the missing messages
// to bridge the gap. The function returns a boolean indicating whether the blocks are continuous.
func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisStartSeq int64, conversationID string,
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {

var lastEndSeq, startSeq, endSeq int64
var isLostSeq bool
if isReverse {
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID)
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
isLostSeq = lastEndSeq+1 != thisStartSeq
startSeq = lastEndSeq + 1
endSeq = thisStartSeq - 1
} else {
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID)
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
isLostSeq = thisStartSeq+1 != lastEndSeq
startSeq = thisStartSeq + 1
endSeq = lastEndSeq - 1
Expand All @@ -73,15 +73,15 @@ func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisSt
// internal and inter-block continuity checks but contains fewer messages than `count`, this function verifies if the end
// of the message history has been reached. If not, it attempts to retrieve any missing messages to ensure continuity.
func (c *Conversation) validateAndFillEndBlockContinuity(ctx context.Context, conversationID string,
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
if isShouldFetchMessage {
c.fetchAndMergeMissingMessages(ctx, conversationID, lostSeqList, isReverse, count, startTime, list, messageListCallback)
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
}

}
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, count int,
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, viewType, count int,
list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (isShouldFetchMessage bool, seqList []int64) {
// Perform an end-of-block check if the retrieved message count is less than requested
if len(*list) < count {
Expand All @@ -94,7 +94,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
if maxSeq >= currentMaxSeq {
messageListCallback.IsEnd = true
} else {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID)
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastEndSeq", lastEndSeq, "conversationID", conversationID)
// If `maxSeq` is zero and `lastEndSeq` is at the maximum server sequence, this batch is fully local
if maxSeq == 0 && lastEndSeq >= currentMaxSeq { // All messages in this batch are local messages,
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
if minSeq <= userCanPullMinSeq {
messageListCallback.IsEnd = true
} else {
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID)
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastMinSeq", lastMinSeq, "conversationID", conversationID)
// If `minSeq` is zero and `lastMinSeq` is at the minimum server sequence, this batch is fully local
if minSeq == 0 && lastMinSeq <= userCanPullMinSeq { // All messages in this batch are local messages,
Expand Down
4 changes: 2 additions & 2 deletions internal/conversation_msg/read_drawing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -52,7 +51,8 @@ func (c *Conversation) markConversationMessageAsRead(ctx context.Context, conver
return err
}
if conversation.UnreadCount == 0 {
return sdkerrs.ErrUnreadCount
log.ZWarn(ctx, "unread count is 0", nil, "conversationID", conversationID)
return nil
}
// get the maximum sequence number of messages in the table that are not sent by oneself
peerUserMaxSeq, err := c.db.GetConversationPeerNormalMsgSeq(ctx, conversationID)
Expand Down
2 changes: 2 additions & 0 deletions internal/interaction/long_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ func (c *LongConnMgr) handleMessage(message []byte) error {
fallthrough
case constant.GetConvMaxReadSeq:
fallthrough
case constant.PullConvLastMessage:
fallthrough
case constant.SendMsg:
fallthrough
case constant.SendSignalMsg:
Expand Down
Loading
Loading