@@ -478,7 +478,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
478
478
case <- ctx .Done ():
479
479
return
480
480
default :
481
- log .Errorf ("PubsubPeerJoin: error interacting with new peer" , err )
481
+ log .Errorf ("PubsubPeerJoin: error interacting with new peer: %s " , err )
482
482
}
483
483
}
484
484
}
@@ -524,29 +524,29 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr
524
524
}
525
525
526
526
func (p * PubsubValueStore ) handleNewPeer (ctx context.Context , peerEvtHandler * pubsub.TopicEventHandler , key string ) ([]byte , error ) {
527
- select {
528
- case <- ctx .Done ():
529
- return nil , ctx .Err ()
530
- default :
531
- }
532
-
533
- var pid peer.ID
534
-
535
- for {
527
+ tried := make (map [peer.ID ]bool )
528
+ for ctx .Err () == nil {
536
529
peerEvt , err := peerEvtHandler .NextPeerEvent (ctx )
537
530
if err != nil {
538
531
if err != context .Canceled {
539
532
log .Warningf ("PubsubNewPeer: subscription error in %s: %s" , key , err .Error ())
540
533
}
541
534
return nil , err
542
535
}
543
- if peerEvt .Type == pubsub .PeerJoin {
544
- pid = peerEvt .Peer
545
- break
536
+
537
+ pid := peerEvt .Peer
538
+ if peerEvt .Type != pubsub .PeerJoin || tried [pid ] {
539
+ continue
546
540
}
547
- }
541
+ tried [ pid ] = true
548
542
549
- return p .fetch .Fetch (ctx , pid , key )
543
+ value , err := p .fetch .Fetch (ctx , pid , key )
544
+ if err == nil {
545
+ return value , nil
546
+ }
547
+ log .Debugf ("failed to fetch latest pubsub value for key '%s' from peer '%s': %s" , key , pid , err )
548
+ }
549
+ return nil , ctx .Err ()
550
550
}
551
551
552
552
func (p * PubsubValueStore ) notifyWatchers (key string , data []byte ) {
0 commit comments