Skip to content

Commit f23f7d6

Browse files
authored
feat: add a function to quickly retrieve the context messages for a given message. (#827)
* fix: quote message change to revoke message when app from background to foreground and message status update. Signed-off-by: Gordon <[email protected]> * feat: add a function to quickly retrieve the context messages for a given message. Signed-off-by: Gordon <[email protected]> * refactor: the SDK interface using the pb protocol to replace json. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 9270ddc commit f23f7d6

19 files changed

+191
-99
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require golang.org/x/net v0.29.0 // indirect
1818

1919
require (
2020
github.com/google/go-cmp v0.6.0
21-
github.com/openimsdk/protocol v0.0.72-alpha.63
21+
github.com/openimsdk/protocol v0.0.72-alpha.70
2222
github.com/openimsdk/tools v0.0.50-alpha.21
2323
github.com/patrickmn/go-cache v2.1.0+incompatible
2424
golang.org/x/image v0.15.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
6666
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
6767
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
6868
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
69-
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
70-
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
69+
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
70+
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
7171
github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY=
7272
github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
7373
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

internal/conversation_msg/api.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
1415
pconstant "github.com/openimsdk/protocol/constant"
1516

1617
"github.com/openimsdk/tools/errs"
@@ -22,6 +23,7 @@ import (
2223
"github.com/openimsdk/openim-sdk-core/v3/pkg/content_type"
2324
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
2425
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
26+
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
2527
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
2628
"github.com/openimsdk/openim-sdk-core/v3/pkg/server_api_params"
2729
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
@@ -1014,3 +1016,60 @@ func (c *Conversation) SearchConversation(ctx context.Context, searchParam strin
10141016
// Return the list of conversations
10151017
return apiConversations, nil
10161018
}
1019+
func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
1020+
return c.typing.GetInputStates(conversationID, userID), nil
1021+
}
1022+
1023+
func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
1024+
return c.typing.ChangeInputStates(ctx, conversationID, focus)
1025+
}
1026+
1027+
func (c *Conversation) FetchSurroundingMessages(ctx context.Context, s *sdk_struct.MsgStruct, before int, after int) ([]*sdk_struct.MsgStruct, error) {
1028+
conversationID := utils.GetConversationIDByMsg(s)
1029+
var message *model_struct.LocalChatLog
1030+
message, err := c.db.GetMessage(ctx, conversationID, s.ClientMsgID)
1031+
if err == nil {
1032+
if message.Status >= constant.MsgStatusHasDeleted {
1033+
return nil, sdkerrs.ErrMsgHasDeleted
1034+
}
1035+
} else {
1036+
if s.Seq == 0 {
1037+
return nil, sdkerrs.ErrMsgHasDeleted
1038+
}
1039+
var messages []*model_struct.LocalChatLog
1040+
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{s.Seq}, false, 1, 0, &messages, &sdk.GetAdvancedHistoryMessageListCallback{})
1041+
if len(messages) < 1 {
1042+
return nil, sdkerrs.ErrMsgHasDeleted
1043+
}
1044+
message = messages[0]
1045+
}
1046+
1047+
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
1048+
if before > 0 {
1049+
req := sdk.GetAdvancedHistoryMessageListParams{
1050+
ConversationID: conversationID,
1051+
Count: before,
1052+
StartClientMsgID: s.ClientMsgID,
1053+
ViewType: cache.ViewSearch,
1054+
}
1055+
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
1056+
if err != nil {
1057+
return nil, err
1058+
}
1059+
result = append(result, val.MessageList...)
1060+
}
1061+
result = append(result, LocalChatLogToMsgStruct(message))
1062+
if after > 0 {
1063+
req := sdk.GetAdvancedHistoryMessageListParams{
1064+
ConversationID: conversationID,
1065+
Count: after,
1066+
StartClientMsgID: s.ClientMsgID,
1067+
}
1068+
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
1069+
if err != nil {
1070+
return nil, err
1071+
}
1072+
result = append(result, val.MessageList...)
1073+
}
1074+
return result, nil
1075+
}

internal/conversation_msg/conversation.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
6565
startTime = m.SendTime
6666
} else {
6767
// Clear both maps when the user enters the conversation
68-
c.messagePullForwardEndSeqMap.Delete(conversationID)
69-
c.messagePullReverseEndSeqMap.Delete(conversationID)
68+
c.messagePullForwardEndSeqMap.Delete(conversationID, req.ViewType)
69+
c.messagePullReverseEndSeqMap.Delete(conversationID, req.ViewType)
7070
}
7171
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
7272
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
73-
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, &messageListCallback)
73+
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback)
7474
if err != nil {
7575
return nil, err
7676
}
@@ -91,7 +91,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
9191
}
9292

9393
func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
94-
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
94+
count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
9595

9696
var list, validMessages []*model_struct.LocalChatLog
9797

@@ -125,8 +125,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
125125
}
126126
if !isReverse {
127127
if thisEndSeq != 0 {
128-
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
129-
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key)
128+
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
129+
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key, viewType)
130130
if value < lastEndSeq || lastEndSeq == 0 {
131131
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
132132
return true
@@ -138,8 +138,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
138138
}
139139
} else {
140140
if thisEndSeq != 0 {
141-
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
142-
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key)
141+
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
142+
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key, viewType)
143143
if value > lastEndSeq || lastEndSeq == 0 {
144144
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
145145
return true
@@ -175,10 +175,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
175175
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
176176
t = time.Now()
177177
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
178-
isReverse, count, startTime, &list, messageListCallback)
178+
isReverse, viewType, count, startTime, &list, messageListCallback)
179179
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
180180
t = time.Now()
181-
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
181+
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType,
182182
count, startTime, &list, messageListCallback)
183183
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
184184
// If the number of valid messages retrieved is less than the count,
@@ -188,7 +188,7 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
188188
newStartTime := getNewStartTime(list)
189189
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
190190
conversationID, "newStartTime", newStartTime)
191-
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback)
191+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback)
192192
if err != nil {
193193
return nil, err
194194
}

internal/conversation_msg/conversation_msg.go

Lines changed: 4 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"math"
99
"sync"
1010

11-
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
12-
1311
"github.com/openimsdk/openim-sdk-core/v3/pkg/api"
1412
"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
1513
"github.com/openimsdk/tools/utils/stringutil"
@@ -67,8 +65,8 @@ type Conversation struct {
6765
file *file.File
6866
cache *cache.Cache[string, *model_struct.LocalConversation]
6967
maxSeqRecorder MaxSeqRecorder
70-
messagePullForwardEndSeqMap *cache.Cache[string, int64]
71-
messagePullReverseEndSeqMap *cache.Cache[string, int64]
68+
messagePullForwardEndSeqMap *cache.ConversationSeqContextCache
69+
messagePullReverseEndSeqMap *cache.ConversationSeqContextCache
7270
IsExternalExtensions bool
7371
msgOffset int
7472
progress int
@@ -108,8 +106,8 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
108106
file: file,
109107
IsExternalExtensions: info.IsExternalExtensions(),
110108
maxSeqRecorder: NewMaxSeqRecorder(),
111-
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
112-
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
109+
messagePullForwardEndSeqMap: cache.NewConversationSeqContextCache(),
110+
messagePullReverseEndSeqMap: cache.NewConversationSeqContextCache(),
113111
msgOffset: 0,
114112
progress: 0,
115113
}
@@ -906,55 +904,3 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string)
906904
}
907905
return userInfo.FaceURL, userInfo.Nickname, nil
908906
}
909-
910-
func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
911-
return c.typing.GetInputStates(conversationID, userID), nil
912-
}
913-
914-
func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
915-
return c.typing.ChangeInputStates(ctx, conversationID, focus)
916-
}
917-
918-
func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) {
919-
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{seq}, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{})
920-
res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq})
921-
if err != nil {
922-
return nil, err
923-
}
924-
if len(res) == 0 {
925-
return []*sdk_struct.MsgStruct{}, nil
926-
}
927-
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
928-
//if len(msgList) == 0 {
929-
// return []*sdk_struct.MsgStruct{}, nil
930-
//}
931-
//msg := msgList[0]
932-
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
933-
//if before > 0 {
934-
// req := sdk.GetAdvancedHistoryMessageListParams{
935-
// ConversationID: conversationID,
936-
// Count: int(before),
937-
// StartClientMsgID: msg.ClientMsgID,
938-
// }
939-
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
940-
// if err != nil {
941-
// return nil, err
942-
// }
943-
// result = append(result, val.MessageList...)
944-
//}
945-
//result = append(result, msg)
946-
//if after > 0 {
947-
// req := sdk.GetAdvancedHistoryMessageListParams{
948-
// ConversationID: conversationID,
949-
// Count: int(after),
950-
// StartClientMsgID: msg.ClientMsgID,
951-
// }
952-
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
953-
// if err != nil {
954-
// return nil, err
955-
// }
956-
// result = append(result, val.MessageList...)
957-
//}
958-
//sort.Sort(sdk_struct.NewMsgList(result))
959-
return result, nil
960-
}

internal/conversation_msg/message_check.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,17 @@ func (c *Conversation) validateAndFillInternalGaps(ctx context.Context, conversa
4343
// validateAndFillInterBlockGaps checks for continuity between blocks of messages. If a gap is identified, it retrieves the missing messages
4444
// to bridge the gap. The function returns a boolean indicating whether the blocks are continuous.
4545
func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisStartSeq int64, conversationID string,
46-
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
46+
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
4747

4848
var lastEndSeq, startSeq, endSeq int64
4949
var isLostSeq bool
5050
if isReverse {
51-
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID)
51+
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
5252
isLostSeq = lastEndSeq+1 != thisStartSeq
5353
startSeq = lastEndSeq + 1
5454
endSeq = thisStartSeq - 1
5555
} else {
56-
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID)
56+
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
5757
isLostSeq = thisStartSeq+1 != lastEndSeq
5858
startSeq = thisStartSeq + 1
5959
endSeq = lastEndSeq - 1
@@ -73,15 +73,15 @@ func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisSt
7373
// internal and inter-block continuity checks but contains fewer messages than `count`, this function verifies if the end
7474
// of the message history has been reached. If not, it attempts to retrieve any missing messages to ensure continuity.
7575
func (c *Conversation) validateAndFillEndBlockContinuity(ctx context.Context, conversationID string,
76-
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
77-
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
76+
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
77+
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
7878
if isShouldFetchMessage {
7979
c.fetchAndMergeMissingMessages(ctx, conversationID, lostSeqList, isReverse, count, startTime, list, messageListCallback)
80-
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
80+
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
8181
}
8282

8383
}
84-
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, count int,
84+
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, viewType, count int,
8585
list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (isShouldFetchMessage bool, seqList []int64) {
8686
// Perform an end-of-block check if the retrieved message count is less than requested
8787
if len(*list) < count {
@@ -94,7 +94,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
9494
if maxSeq >= currentMaxSeq {
9595
messageListCallback.IsEnd = true
9696
} else {
97-
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID)
97+
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
9898
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastEndSeq", lastEndSeq, "conversationID", conversationID)
9999
// If `maxSeq` is zero and `lastEndSeq` is at the maximum server sequence, this batch is fully local
100100
if maxSeq == 0 && lastEndSeq >= currentMaxSeq { // All messages in this batch are local messages,
@@ -124,7 +124,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
124124
if minSeq <= userCanPullMinSeq {
125125
messageListCallback.IsEnd = true
126126
} else {
127-
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID)
127+
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
128128
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastMinSeq", lastMinSeq, "conversationID", conversationID)
129129
// If `minSeq` is zero and `lastMinSeq` is at the minimum server sequence, this batch is fully local
130130
if minSeq == 0 && lastMinSeq <= userCanPullMinSeq { // All messages in this batch are local messages,

internal/conversation_msg/read_drawing.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
2323
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
2424
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
25-
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
2625
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
2726
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
2827
"github.com/openimsdk/tools/errs"
@@ -52,7 +51,8 @@ func (c *Conversation) markConversationMessageAsRead(ctx context.Context, conver
5251
return err
5352
}
5453
if conversation.UnreadCount == 0 {
55-
return sdkerrs.ErrUnreadCount
54+
log.ZWarn(ctx, "unread count is 0", nil, "conversationID", conversationID)
55+
return nil
5656
}
5757
// get the maximum sequence number of messages in the table that are not sent by oneself
5858
peerUserMaxSeq, err := c.db.GetConversationPeerNormalMsgSeq(ctx, conversationID)

internal/interaction/long_conn_mgr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,8 @@ func (c *LongConnMgr) handleMessage(message []byte) error {
506506
fallthrough
507507
case constant.GetConvMaxReadSeq:
508508
fallthrough
509+
case constant.PullConvLastMessage:
510+
fallthrough
509511
case constant.SendMsg:
510512
fallthrough
511513
case constant.SendSignalMsg:

0 commit comments

Comments
 (0)