Skip to content

Commit 0289061

Browse files
committed
recent_senders: Add the new MessageIdTracker and RecentSenders data structures
MessageIdTracker data structure is used to keep track of message ids in an ascending sorted list. It is used in RecentSenders data structure. RecentSenders data structure is used to keep track of user messages in topics and streams. Much of this code is transcribed from Zulip web; in particular, from: web\src\recent_senders.ts
1 parent 034678a commit 0289061

6 files changed

+447
-0
lines changed

lib/model/message_list.dart

+2
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
403403
numAfter: 0,
404404
);
405405
store.reconcileMessages(result.messages);
406+
store.recentSenders.handleMessages(result.messages);
406407
for (final message in result.messages) {
407408
if (_messageVisible(message)) {
408409
_addMessage(message);
@@ -439,6 +440,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
439440
}
440441

441442
store.reconcileMessages(result.messages);
443+
store.recentSenders.handleMessages(result.messages, inReverse: true);
442444

443445
final fetchedMessages = _allMessagesVisible
444446
? result.messages // Avoid unnecessarily copying the list.

lib/model/recent_senders.dart

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import 'package:collection/collection.dart';
2+
import 'package:flutter/foundation.dart';
3+
4+
import '../api/model/events.dart';
5+
import '../api/model/model.dart';
6+
7+
/// A data structure to keep track of stream and topic messages of users (senders).
8+
///
9+
/// Use [latestMessageIdOfSenderInStream] and [latestMessageIdOfSenderInTopic]
10+
/// to get the relevant data.
11+
class RecentSenders {
12+
// streamSenders[streamId][senderId] = MessageIdTracker
13+
@visibleForTesting
14+
final Map<int, Map<int, MessageIdTracker>> streamSenders = {};
15+
16+
// topicSenders[streamId][topic][senderId] = MessageIdTracker
17+
@visibleForTesting
18+
final Map<int, Map<String, Map<int, MessageIdTracker>>> topicSenders = {};
19+
20+
int? latestMessageIdOfSenderInStream({
21+
required int streamId,
22+
required int senderId,
23+
}) => streamSenders[streamId]?[senderId]?.maxId;
24+
25+
int? latestMessageIdOfSenderInTopic({
26+
required int streamId,
27+
required String topic,
28+
required int senderId,
29+
}) => topicSenders[streamId]?[topic]?[senderId]?.maxId;
30+
31+
/// Calls [handleMessage] for each of the message.
32+
///
33+
/// [inReverse] is used for the optimization of the underlying
34+
/// [MessageIdTracker.add] method. It should be `false` (default) when
35+
/// [messages] are the initial messages in a narrow (fetched through
36+
/// [MessageListView.fetchInitial]), and `true` when they are the older
37+
/// messages in a narrow (fetched through [MessageListView.fetchOlder]).
38+
void handleMessages(List<Message> messages, {bool inReverse = false}) {
39+
for (int i = inReverse ? messages.length - 1 : 0;
40+
inReverse ? i >= 0 : i < messages.length;
41+
inReverse ? i-- : i++) {
42+
handleMessage(messages[i]);
43+
}
44+
}
45+
46+
/// Records the necessary data from [message] if it is a [StreamMessage].
47+
///
48+
/// If [message] is not a [StreamMessage], this is a no-op.
49+
void handleMessage(Message message) {
50+
if (message is! StreamMessage) return;
51+
52+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
53+
54+
// Track message in stream.
55+
var sendersMap = streamSenders[streamId] ??= {};
56+
var idTracker = sendersMap[senderId] ??= MessageIdTracker();
57+
idTracker.add(messageId);
58+
59+
// Track message in topic.
60+
final topicsMap = topicSenders[streamId] ??= {};
61+
sendersMap = topicsMap[topic] ??= {};
62+
idTracker = sendersMap[senderId] ??= MessageIdTracker();
63+
idTracker.add(messageId);
64+
}
65+
66+
void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
67+
if (event.messageType != MessageType.stream) return;
68+
69+
for (final id in event.messageIds) {
70+
final message = cachedMessages[id] as StreamMessage?;
71+
if (message == null) break;
72+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
73+
74+
_removeMessageInStream(
75+
streamId: streamId, senderId: senderId, messageId: messageId);
76+
77+
_removeMessageInTopic(streamId: streamId, topic: topic, senderId: senderId,
78+
messageId: messageId);
79+
}
80+
}
81+
82+
void _removeMessageInStream({
83+
required int streamId,
84+
required int senderId,
85+
required int messageId,
86+
}) {
87+
if (streamSenders.isEmpty) return;
88+
89+
final sendersMap = streamSenders[streamId];
90+
if (sendersMap == null) return;
91+
92+
final idTracker = sendersMap[senderId];
93+
if (idTracker == null) return;
94+
95+
idTracker.remove(messageId);
96+
if (idTracker.maxId == null) sendersMap.remove(senderId);
97+
if (sendersMap.isEmpty) streamSenders.remove(streamId);
98+
}
99+
100+
void _removeMessageInTopic({
101+
required int streamId,
102+
required String topic,
103+
required int senderId,
104+
required int messageId,
105+
}) {
106+
if (topicSenders.isEmpty) return;
107+
108+
final topicsMap = topicSenders[streamId];
109+
if (topicsMap == null) return;
110+
111+
final sendersMap = topicsMap[topic];
112+
if (sendersMap == null) return;
113+
114+
final idTracker = sendersMap[senderId];
115+
if (idTracker == null) return;
116+
117+
idTracker.remove(messageId);
118+
if (idTracker.maxId == null) sendersMap.remove(senderId);
119+
if (sendersMap.isEmpty) topicsMap.remove(topic);
120+
if (topicsMap.isEmpty) topicSenders.remove(streamId);
121+
}
122+
}
123+
124+
class MessageIdTracker {
125+
/// A list of distinct message IDs, sorted ascendingly.
126+
final QueueList<int> _ids = QueueList.from([]);
127+
128+
/// The maximum id in the tracker list, or `null` if the list is empty.
129+
int? get maxId => _ids.lastOrNull;
130+
131+
/// Add the message ID to the tracker list at the proper place, if not already present.
132+
///
133+
/// Optimized, taking O(1) time, for the cases where that place is the start
134+
/// (message fetched through [MessageListView.fetchOlder]) or the end (message
135+
/// fetched through [MessageListView.fetchInitial] or
136+
/// [PerAccountStore.handleEvent]), because those are the common cases for a
137+
/// message the app receives. May take O(n) time in some rare cases.
138+
void add(int id) {
139+
final i = lowerBound(_ids, id);
140+
if (i < _ids.length && _ids[i] == id) {
141+
// The ID is already present. Nothing to do.
142+
return;
143+
}
144+
if (i == 0) {
145+
_ids.addFirst(id);
146+
} else if (i == _ids.length) {
147+
_ids.addLast(id);
148+
} else {
149+
_ids.insert(i, id);
150+
}
151+
}
152+
153+
void remove(int id) => _ids.remove(id);
154+
155+
@override
156+
bool operator ==(covariant MessageIdTracker other) {
157+
if (identical(this, other)) return true;
158+
159+
return _ids.equals(other._ids);
160+
}
161+
162+
@override
163+
int get hashCode => Object.hashAll(_ids);
164+
165+
@override
166+
String toString() => _ids.toString();
167+
}

lib/model/store.dart

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import 'database.dart';
2323
import 'message.dart';
2424
import 'message_list.dart';
2525
import 'recent_dm_conversations.dart';
26+
import 'recent_senders.dart';
2627
import 'stream.dart';
2728
import 'unreads.dart';
2829

@@ -251,6 +252,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
251252
),
252253
recentDmConversationsView: RecentDmConversationsView(
253254
initial: initialSnapshot.recentPrivateConversations, selfUserId: account.userId),
255+
recentSenders: RecentSenders(),
254256
);
255257
}
256258

@@ -270,6 +272,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
270272
required MessageStoreImpl messages,
271273
required this.unreads,
272274
required this.recentDmConversationsView,
275+
required this.recentSenders,
273276
}) : assert(selfUserId == globalStore.getAccount(accountId)!.userId),
274277
assert(realmUrl == globalStore.getAccount(accountId)!.realmUrl),
275278
assert(realmUrl == connection.realmUrl),
@@ -361,6 +364,8 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
361364

362365
final RecentDmConversationsView recentDmConversationsView;
363366

367+
final RecentSenders recentSenders;
368+
364369
////////////////////////////////
365370
// Other digests of data.
366371

@@ -471,6 +476,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
471476
_messages.handleMessageEvent(event);
472477
unreads.handleMessageEvent(event);
473478
recentDmConversationsView.handleMessageEvent(event);
479+
recentSenders.handleMessage(event.message);
474480
// When adding anything here (to handle [MessageEvent]),
475481
// it probably belongs in [reconcileMessages] too.
476482
} else if (event is UpdateMessageEvent) {
@@ -479,6 +485,8 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
479485
unreads.handleUpdateMessageEvent(event);
480486
} else if (event is DeleteMessageEvent) {
481487
assert(debugLog("server event: delete_message ${event.messageIds}"));
488+
// This should be called before [_messages.handleDeleteMessageEvent(event)].
489+
recentSenders.handleDeleteMessageEvent(event, messages);
482490
_messages.handleDeleteMessageEvent(event);
483491
unreads.handleDeleteMessageEvent(event);
484492
} else if (event is UpdateMessageFlagsEvent) {

test/model/message_list_test.dart

+40
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import '../api/model/model_checks.dart';
1818
import '../example_data.dart' as eg;
1919
import '../stdlib_checks.dart';
2020
import 'content_checks.dart';
21+
import 'recent_senders_test.dart';
2122
import 'test_store.dart';
2223

2324
void main() {
@@ -141,6 +142,24 @@ void main() {
141142
..haveOldest.isTrue();
142143
});
143144

145+
test('fetchInitial, recent senders track all the messages', () async {
146+
const narrow = CombinedFeedNarrow();
147+
await prepare(narrow: narrow);
148+
final messages = [
149+
eg.streamMessage(),
150+
// Not subscribed to the stream with id 10.
151+
eg.streamMessage(stream: eg.stream(streamId: 10)),
152+
];
153+
connection.prepare(json: newestResult(
154+
foundOldest: false,
155+
messages: messages,
156+
).toJson());
157+
await model.fetchInitial();
158+
159+
check(model).messages.length.equals(1);
160+
checkMatchesMessages(messages, store.recentSenders);
161+
});
162+
144163
test('fetchOlder', () async {
145164
const narrow = CombinedFeedNarrow();
146165
await prepare(narrow: narrow);
@@ -233,6 +252,27 @@ void main() {
233252
..messages.length.equals(200);
234253
});
235254

255+
256+
test('fetchOlder, recent senders track all the messages', () async {
257+
const narrow = CombinedFeedNarrow();
258+
await prepare(narrow: narrow);
259+
final initialMessages = List.generate(10, (i) => eg.streamMessage(id: 100 + i));
260+
await prepareMessages(foundOldest: false, messages: initialMessages);
261+
262+
final oldMessages = List.generate(10,
263+
(i) => eg.streamMessage(id: 90 + i))
264+
// Not subscribed to the stream with id 10.
265+
..add(eg.streamMessage(stream: eg.stream(streamId: 10)));
266+
connection.prepare(json: olderResult(
267+
anchor: 100, foundOldest: false,
268+
messages: oldMessages,
269+
).toJson());
270+
await model.fetchOlder();
271+
272+
check(model).messages.length.equals(20);
273+
checkMatchesMessages([...initialMessages, ...oldMessages], store.recentSenders);
274+
});
275+
236276
test('MessageEvent', () async {
237277
final stream = eg.stream();
238278
await prepare(narrow: StreamNarrow(stream.streamId));

test/model/recent_senders_checks.dart

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import 'package:checks/checks.dart';
2+
import 'package:zulip/model/recent_senders.dart';
3+
4+
extension RecentSendersChecks on Subject<RecentSenders> {
5+
Subject<Map<int, Map<int, MessageIdTracker>>> get streamSenders =>
6+
has((rs) => rs.streamSenders, 'streamSenders');
7+
8+
Subject<Map<int, Map<String, Map<int, MessageIdTracker>>>> get topicSenders =>
9+
has((rs) => rs.topicSenders, 'topicSenders');
10+
11+
Subject<int?> latestMessageIdOfSenderInStream({
12+
required int streamId,
13+
required int senderId,
14+
}) => has(
15+
(rs) => rs.latestMessageIdOfSenderInStream(streamId: streamId,
16+
senderId: senderId),
17+
'latestMessageIdOfSenderInStream',
18+
);
19+
20+
Subject<int?> latestMessageIdOfSenderInTopic({
21+
required int streamId,
22+
required String topic,
23+
required int senderId,
24+
}) => has(
25+
(rs) => rs.latestMessageIdOfSenderInTopic(streamId: streamId,
26+
topic: topic, senderId: senderId),
27+
'latestMessageIdOfSenderInTopic',
28+
);
29+
}

0 commit comments

Comments
 (0)