Skip to content

Stream Removal if its Sanity Check Fails #4839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,10 @@ func (host *HostV2) ListTopic() []string {
func (host *HostV2) ListPeer(topic string) []libp2p_peer.ID {
host.lock.Lock()
defer host.lock.Unlock()
return host.joined[topic].ListPeers()
if t, ok := host.joined[topic]; ok {
return t.ListPeers()
}
return nil
}

// ListBlockedPeer returns list of blocked peer
Expand Down Expand Up @@ -693,6 +696,23 @@ func (host *HostV2) Connected(net libp2p_network.Network, conn libp2p_network.Co
}
}

// TODO: this function could be used later to get peer topics and filter them based on the topics
// getPeerTopics returns the list of topics a peer is subscribed to
// func (host *HostV2) getPeerTopics(peerID libp2p_peer.ID) map[string]bool {
// topics := make(map[string]bool)
// host.lock.Lock()
// defer host.lock.Unlock()
// for topic, t := range host.joined {
// for _, p := range t.ListPeers() {
// if p == peerID {
// topics[topic] = true
// break
// }
// }
// }
// return topics
// }

// called when a connection closed
func (host *HostV2) Disconnected(net libp2p_network.Network, conn libp2p_network.Conn) {
host.logger.Debug().Interface("node", conn.RemotePeer()).Msg("peer disconnected")
Expand Down
1 change: 1 addition & 0 deletions p2p/stream/common/streammanager/streammanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (sm *streamManager) loop() {
// NewStream handles a new stream from stream handler protocol
func (sm *streamManager) NewStream(stream sttypes.Stream) error {
if err := sm.sanityCheckStream(stream); err != nil {
stream.Close()
return errors.Wrap(err, "stream sanity check failed")
}
task := addStreamTask{
Expand Down