Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat_: use content-topic override for all community filters #5993

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/communities_messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3631,7 +3631,7 @@ func (s *MessengerCommunitiesSuite) TestHandleImport() {
message.Sig = crypto.FromECDSAPub(&s.owner.identity.PublicKey)
message.Payload = wrappedPayload

filter := s.alice.transport.FilterByChatID(chat.ID)
filter := s.alice.transport.FilterByChatID(community.UniversalChatID())
importedMessages := make(map[transport.Filter][]*wakutypes.Message, 0)

importedMessages[*filter] = append(importedMessages[*filter], message)
Expand Down
1 change: 0 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,6 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
topic := wakutypes.BytesToTopic(transport.ToTopic(chat.ID))
communityCommonTopic := wakutypes.BytesToTopic(transport.ToTopic(community.UniversalChatID()))
topics := []wakutypes.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
DataDir: os.TempDir() + "/archivedata",
Expand Down
70 changes: 3 additions & 67 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,11 +955,6 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha

chats := CreateCommunityChats(community, m.getTimesource())

for _, chat := range chats {
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})

}

// Load transport filters
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
Expand Down Expand Up @@ -2402,25 +2397,12 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
response.CommunityChanges = []*communities.CommunityChanges{changes}

var chats []*Chat
var publicFiltersToInit []transport.FiltersToInitialize
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()})

response.AddChat(c)
}

// Load filters
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return nil, err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return nil, err
}

err = m.saveChats(chats)
if err != nil {
return nil, err
Expand All @@ -2444,24 +2426,12 @@ func (m *Messenger) EditCommunityChat(communityID types.HexBytes, chatID string,
response.CommunityChanges = []*communities.CommunityChanges{changes}

var chats []*Chat
var publicFiltersToInit []transport.FiltersToInitialize
for chatID, change := range changes.ChatsModified {
c := CreateCommunityChat(community.IDString(), chatID, change.ChatModified, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: community.PubsubTopic()})
response.AddChat(c)
}

// Load filters
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return nil, err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return nil, err
}

return &response, m.saveChats(chats)
}

Expand Down Expand Up @@ -2494,16 +2464,12 @@ func (m *Messenger) InitCommunityFilters(communityFiltersToInitialize []transpor
func (m *Messenger) DefaultFilters(o *communities.Community) []transport.FiltersToInitialize {
cID := o.IDString()
uncompressedPubKey := common.PubkeyToHex(o.PublicKey())[2:]
updatesChannelID := o.StatusUpdatesChannelID()
mlChannelID := o.MagnetlinkMessageChannelID()
memberUpdateChannelID := o.MemberUpdateChannelID()

communityPubsubTopic := o.PubsubTopic()

filters := []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
}
Expand Down Expand Up @@ -2723,15 +2689,6 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err
return err
}
}
for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
if err != nil {
return err
}
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: communityChatID, PubsubTopic: community.PubsubTopic()})
}

_, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return err
Expand Down Expand Up @@ -3378,12 +3335,10 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
return nil
}

removedChatIDs := make([]string, 0)
for id := range communityResponse.Changes.ChatsRemoved {
chatID := community.ChatID(id)
_, ok := state.AllChats.Load(chatID)
if ok {
removedChatIDs = append(removedChatIDs, chatID)
state.AllChats.Delete(chatID)
err := m.DeleteChat(chatID)
if err != nil {
Expand Down Expand Up @@ -3413,7 +3368,6 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
// Update relevant chats names and add new ones
// Currently removal is not supported
chats := CreateCommunityChats(community, state.Timesource)
var publicFiltersToInit []transport.FiltersToInitialize
for i, chat := range chats {

oldChat, ok := state.AllChats.Load(chat.ID)
Expand All @@ -3422,10 +3376,7 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
state.AllChats.Store(chat.ID, chats[i])

state.Response.AddChat(chat)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
})

// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
oldChat.Description != chat.Description ||
Expand All @@ -3444,23 +3395,6 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
}
}

for _, chatID := range removedChatIDs {
_, err := m.transport.RemoveFilterByChatID(chatID)
if err != nil {
m.logger.Error("couldn't remove filter", zap.Error(err))
}
}

// Load transport filters
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return err
}

for _, requestToJoin := range communityResponse.RequestsToJoin {
// Activity Center notification
notification, err := m.persistence.GetActivityCenterNotificationByID(requestToJoin.ID)
Expand Down Expand Up @@ -3961,6 +3895,8 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community

filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID()))

filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID()))

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
// possibly missed since then
Expand Down
15 changes: 6 additions & 9 deletions protocol/messenger_filter_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ func (m *Messenger) processSingleChat(chat *Chat, communityInfo map[string]*comm
filters = append(filters, transport.FiltersToInitialize{ChatID: chat.ID})

case ChatTypeCommunityChat:
filter, err := m.processCommunityChat(chat, communityInfo)
// Since universalChatID is being used, no specific filters needs to be registered for all community chats.
// Reasoning: https://github.com/status-im/status-go/pull/5993
err := m.processCommunityChat(chat, communityInfo)
if err != nil {
return nil, nil, err
}
filters = append(filters, filter)

case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
Expand All @@ -209,13 +209,13 @@ func (m *Messenger) processSingleChat(chat *Chat, communityInfo map[string]*comm
return filters, publicKeys, nil
}

func (m *Messenger) processCommunityChat(chat *Chat, communityInfo map[string]*communities.Community) (transport.FiltersToInitialize, error) {
func (m *Messenger) processCommunityChat(chat *Chat, communityInfo map[string]*communities.Community) error {
community, ok := communityInfo[chat.CommunityID]
if !ok {
var err error
community, err = m.communitiesManager.GetByIDString(chat.CommunityID)
if err != nil {
return transport.FiltersToInitialize{}, err
return err
}
communityInfo[chat.CommunityID] = community
}
Expand All @@ -229,10 +229,7 @@ func (m *Messenger) processCommunityChat(chat *Chat, communityInfo map[string]*c
}
}

return transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
}, nil
return nil
}

func (m *Messenger) processPrivateGroupChat(chat *Chat) ([]*ecdsa.PublicKey, error) {
Expand Down
Loading