diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 939363e6f0..12f52825ea 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -685,7 +685,7 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes hashes := make([][]byte, 0, len(newMessages)) for _, newMessage := range newMessages { - hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) + hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.ContentTopic) if err != nil { return nil, nil, err } diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go index 687d3b78a9..8a3bcce4b7 100644 --- a/protocol/common/raw_message.go +++ b/protocol/common/raw_message.go @@ -80,6 +80,7 @@ type RawMessage struct { Ephemeral bool BeforeDispatch func(*RawMessage) error HashRatchetGroupID []byte + ContentTopic string PubsubTopic string ResendType ResendType ResendMethod ResendMethod diff --git a/protocol/communities/community.go b/protocol/communities/community.go index 4e6a865f7e..c7c82916dd 100644 --- a/protocol/communities/community.go +++ b/protocol/communities/community.go @@ -1568,6 +1568,13 @@ func (o *Community) setPrivateKey(pk *ecdsa.PrivateKey) { } } +func (o *Community) UniversalChatID() string { + // Using Member updates channelID as chatID to act as a universal content-topic for all chats in the community as explained here https://forum.vac.dev/t/status-communities-review-and-proposed-usage-of-waku-content-topics/335 + // This is to match filter criteria of community with the content-topic usage. + // This specific topic is chosen as existing users before the change are already subscribed to this and will not get affected by it. + return o.MemberUpdateChannelID() +} + func (o *Community) SetResendAccountsClock(clock uint64) { o.config.CommunityDescription.ResendAccountsClock = clock } diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index ce07720591..878d25931c 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -4279,6 +4279,20 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) { return chatIDs, nil } +func (m *Manager) GetOwnedCommunitiesUniversalChatIDs() (map[string]bool, error) { + ownedCommunities, err := m.Controlled() + if err != nil { + return nil, err + } + chatIDs := make(map[string]bool) + for _, c := range ownedCommunities { + if c.Joined() { + chatIDs[c.UniversalChatID()] = true + } + } + return chatIDs, nil +} + func (m *Manager) StoreWakuMessage(message *wakutypes.Message) error { return m.persistence.SaveWakuMessage(message) } diff --git a/protocol/communities/manager_archive.go b/protocol/communities/manager_archive.go index 17a251c18f..f53dabf6e2 100644 --- a/protocol/communities/manager_archive.go +++ b/protocol/communities/manager_archive.go @@ -356,6 +356,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community, m.logger.Error("failed to get community chat topics ", zap.Error(err)) continue } + // adding the content-topic used for member updates. + // since member updates would not be too frequent i.e only addition/deletion would add a new message, + // this shouldn't cause too much increase in size of archive generated. + topics = append(topics, m.transport.FilterByChatID(community.UniversalChatID()).ContentTopic) ts := time.Now().Unix() to := time.Unix(ts, 0) diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 4043453f85..38fed24d86 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -2154,7 +2154,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe startDate := messageDate.Add(-time.Minute) endDate := messageDate.Add(time.Minute) topic := wakutypes.BytesToTopic(transport.ToTopic(chat.ID)) - topics := []wakutypes.TopicType{topic} + communityCommonTopic := wakutypes.BytesToTopic(transport.ToTopic(community.UniversalChatID())) + topics := []wakutypes.TopicType{topic, communityCommonTopic} torrentConfig := params.TorrentConfig{ Enabled: true, diff --git a/protocol/messenger.go b/protocol/messenger.go index 9373d47b9c..2402b0c6ea 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1960,6 +1960,10 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec co } func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMessage) (common.RawMessage, error) { + if rawMessage.ContentTopic == "" { + rawMessage.ContentTopic = rawMessage.LocalChatID + } + var err error var id []byte logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", rawMessage.LocalChatID)) @@ -1994,7 +1998,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe case ChatTypePublic, ChatTypeProfile: logger.Debug("sending public message", zap.String("chatName", chat.Name)) - id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage) + id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage) if err != nil { return rawMessage, err } @@ -2004,6 +2008,9 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe if err != nil { return rawMessage, err } + // Use a single content-topic for all community chats. + // Reasoning: https://github.com/status-im/status-go/pull/5864 + rawMessage.ContentTopic = community.UniversalChatID() rawMessage.PubsubTopic = community.PubsubTopic() canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), rawMessage.MessageType) @@ -2031,7 +2038,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe } isEncrypted := isCommunityEncrypted || isChannelEncrypted if !isEncrypted { - id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage) + id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage) if err != nil { return rawMessage, err } @@ -3341,6 +3348,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Info("failed to retrieve admin communities", zap.Error(err)) } + // fetch universal chatIDs as well. + controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs() + if err != nil { + logger.Info("failed to retrieve controlled communities", zap.Error(err)) + } + for chatID, flag := range controlledCommunitiesUniversalChatIDs { + controlledCommunitiesChatIDs[chatID] = flag + } + iterator := m.retrievedMessagesIteratorFactory(chatWithMessages) for iterator.HasNext() { filter, messages := iterator.Next() diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 2123dac629..5796caf0d3 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -2717,7 +2717,12 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats())) publicFiltersToInit = append(publicFiltersToInit, defaultFilters...) - + for _, filter := range defaultFilters { + _, err := m.transport.RemoveFilterByChatID(filter.ChatID) + if err != nil { + return err + } + } for chatID := range community.Chats() { communityChatID := community.IDString() + chatID _, err := m.transport.RemoveFilterByChatID(communityChatID) @@ -3954,6 +3959,8 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community topics = append(topics, filter.ContentTopic) } + filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID())) + // First we need to know the timestamp of the latest waku message // we've received for this community, so we can request messages we've // possibly missed since then