diff --git a/p2p/host.go b/p2p/host.go index dbef233f63..b6be638f6a 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -633,7 +633,10 @@ func (host *HostV2) ListTopic() []string { func (host *HostV2) ListPeer(topic string) []libp2p_peer.ID { host.lock.Lock() defer host.lock.Unlock() - return host.joined[topic].ListPeers() + if t, ok := host.joined[topic]; ok { + return t.ListPeers() + } + return nil } // ListBlockedPeer returns list of blocked peer @@ -693,6 +696,22 @@ func (host *HostV2) Connected(net libp2p_network.Network, conn libp2p_network.Co } } +// getPeerTopics returns the list of topics a peer is subscribed to +func (host *HostV2) getPeerTopics(peerID libp2p_peer.ID) map[string]bool { + topics := make(map[string]bool) + host.lock.Lock() + defer host.lock.Unlock() + for topic, t := range host.joined { + for _, p := range t.ListPeers() { + if p == peerID { + topics[topic] = true + break + } + } + } + return topics +} + // called when a connection closed func (host *HostV2) Disconnected(net libp2p_network.Network, conn libp2p_network.Conn) { host.logger.Debug().Interface("node", conn.RemotePeer()).Msg("peer disconnected") diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index 0bfc9bbd5b..9c9ee7608c 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -182,6 +182,7 @@ func (sm *streamManager) loop() { // NewStream handles a new stream from stream handler protocol func (sm *streamManager) NewStream(stream sttypes.Stream) error { if err := sm.sanityCheckStream(stream); err != nil { + stream.Close() return errors.Wrap(err, "stream sanity check failed") } task := addStreamTask{