Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 9d9719e

Browse files
authored
Merge pull request #351 from ipfs/refactor/conn-mgmt
Move connection management into networking layer
2 parents 9cafdc2 + e6bf8af commit 9d9719e

12 files changed

+843
-362
lines changed

internal/decision/engine.go

+4-17
Original file line numberDiff line numberDiff line change
@@ -745,32 +745,19 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
745745
func (e *Engine) PeerConnected(p peer.ID) {
746746
e.lock.Lock()
747747
defer e.lock.Unlock()
748-
l, ok := e.ledgerMap[p]
748+
749+
_, ok := e.ledgerMap[p]
749750
if !ok {
750-
l = newLedger(p)
751-
e.ledgerMap[p] = l
751+
e.ledgerMap[p] = newLedger(p)
752752
}
753-
754-
l.lk.Lock()
755-
defer l.lk.Unlock()
756-
l.ref++
757753
}
758754

759755
// PeerDisconnected is called when a peer disconnects.
760756
func (e *Engine) PeerDisconnected(p peer.ID) {
761757
e.lock.Lock()
762758
defer e.lock.Unlock()
763-
l, ok := e.ledgerMap[p]
764-
if !ok {
765-
return
766-
}
767759

768-
l.lk.Lock()
769-
defer l.lk.Unlock()
770-
l.ref--
771-
if l.ref <= 0 {
772-
delete(e.ledgerMap, p)
773-
}
760+
delete(e.ledgerMap, p)
774761
}
775762

776763
// If the want is a want-have, and it's below a certain size, send the full

internal/decision/ledger.go

-4
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ type ledger struct {
4343
// wantList is a (bounded, small) set of keys that Partner desires.
4444
wantList *wl.Wantlist
4545

46-
// ref is the reference count for this ledger, its used to ensure we
47-
// don't drop the reference to this ledger in multi-connection scenarios
48-
ref int
49-
5046
lk sync.RWMutex
5147
}
5248

internal/messagequeue/messagequeue.go

+51-115
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ const (
2525
defaultRebroadcastInterval = 30 * time.Second
2626
// maxRetries is the number of times to attempt to send a message before
2727
// giving up
28-
maxRetries = 10
28+
maxRetries = 3
29+
sendTimeout = 30 * time.Second
2930
// maxMessageSize is the maximum message size in bytes
3031
maxMessageSize = 1024 * 1024 * 2
3132
// sendErrorBackoff is the time to wait before retrying to connect after
@@ -46,7 +47,7 @@ const (
4647
// sender.
4748
type MessageNetwork interface {
4849
ConnectTo(context.Context, peer.ID) error
49-
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
50+
NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
5051
Latency(peer.ID) time.Duration
5152
Ping(context.Context, peer.ID) ping.Result
5253
Self() peer.ID
@@ -55,14 +56,14 @@ type MessageNetwork interface {
5556
// MessageQueue implements queue of want messages to send to peers.
5657
type MessageQueue struct {
5758
ctx context.Context
59+
shutdown func()
5860
p peer.ID
5961
network MessageNetwork
6062
dhTimeoutMgr DontHaveTimeoutManager
6163
maxMessageSize int
6264
sendErrorBackoff time.Duration
6365

6466
outgoingWork chan time.Time
65-
done chan struct{}
6667

6768
// Take lock whenever any of these variables are modified
6869
wllock sync.Mutex
@@ -169,8 +170,10 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
169170
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
170171
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
171172

173+
ctx, cancel := context.WithCancel(ctx)
172174
mq := &MessageQueue{
173175
ctx: ctx,
176+
shutdown: cancel,
174177
p: p,
175178
network: network,
176179
dhTimeoutMgr: dhTimeoutMgr,
@@ -179,7 +182,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
179182
peerWants: newRecallWantList(),
180183
cancels: cid.NewSet(),
181184
outgoingWork: make(chan time.Time, 1),
182-
done: make(chan struct{}),
183185
rebroadcastInterval: defaultRebroadcastInterval,
184186
sendErrorBackoff: sendErrorBackoff,
185187
priority: maxPriority,
@@ -300,12 +302,17 @@ func (mq *MessageQueue) Startup() {
300302

301303
// Shutdown stops the processing of messages for a message queue.
302304
func (mq *MessageQueue) Shutdown() {
303-
close(mq.done)
305+
mq.shutdown()
304306
}
305307

306308
func (mq *MessageQueue) onShutdown() {
307309
// Shut down the DONT_HAVE timeout manager
308310
mq.dhTimeoutMgr.Shutdown()
311+
312+
// Reset the streamMessageSender
313+
if mq.sender != nil {
314+
_ = mq.sender.Reset()
315+
}
309316
}
310317

311318
func (mq *MessageQueue) runQueue() {
@@ -351,15 +358,7 @@ func (mq *MessageQueue) runQueue() {
351358
// in sendMessageDebounce. Send immediately.
352359
workScheduled = time.Time{}
353360
mq.sendIfReady()
354-
case <-mq.done:
355-
if mq.sender != nil {
356-
mq.sender.Close()
357-
}
358-
return
359361
case <-mq.ctx.Done():
360-
if mq.sender != nil {
361-
_ = mq.sender.Reset()
362-
}
363362
return
364363
}
365364
}
@@ -409,12 +408,12 @@ func (mq *MessageQueue) sendIfReady() {
409408
}
410409

411410
func (mq *MessageQueue) sendMessage() {
412-
err := mq.initializeSender()
411+
sender, err := mq.initializeSender()
413412
if err != nil {
414-
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
415-
// TODO: cant connect, what now?
416-
// TODO: should we stop using this connection and clear the want list
417-
// to avoid using up memory?
413+
// If we fail to initialize the sender, the networking layer will
414+
// emit a Disconnect event and the MessageQueue will get cleaned up
415+
log.Infof("Could not open message sender to peer %s: %s", mq.p, err)
416+
mq.Shutdown()
418417
return
419418
}
420419

@@ -423,7 +422,7 @@ func (mq *MessageQueue) sendMessage() {
423422
mq.dhTimeoutMgr.Start()
424423

425424
// Convert want lists to a Bitswap Message
426-
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
425+
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
427426

428427
// After processing the message, clear out its fields to save memory
429428
defer mq.msg.Reset(false)
@@ -435,23 +434,22 @@ func (mq *MessageQueue) sendMessage() {
435434
wantlist := message.Wantlist()
436435
mq.logOutgoingMessage(wantlist)
437436

438-
// Try to send this message repeatedly
439-
for i := 0; i < maxRetries; i++ {
440-
if mq.attemptSendAndRecovery(message) {
441-
// We were able to send successfully.
442-
onSent()
437+
if err := sender.SendMsg(mq.ctx, message); err != nil {
438+
// If the message couldn't be sent, the networking layer will
439+
// emit a Disconnect event and the MessageQueue will get cleaned up
440+
log.Infof("Could not send message to peer %s: %s", mq.p, err)
441+
mq.Shutdown()
442+
return
443+
}
443444

444-
mq.simulateDontHaveWithTimeout(wantlist)
445+
// Set a timer to wait for responses
446+
mq.simulateDontHaveWithTimeout(wantlist)
445447

446-
// If the message was too big and only a subset of wants could be
447-
// sent, schedule sending the rest of the wants in the next
448-
// iteration of the event loop.
449-
if mq.hasPendingWork() {
450-
mq.signalWorkReady()
451-
}
452-
453-
return
454-
}
448+
// If the message was too big and only a subset of wants could be
449+
// sent, schedule sending the rest of the wants in the next
450+
// iteration of the event loop.
451+
if mq.hasPendingWork() {
452+
mq.signalWorkReady()
455453
}
456454
}
457455

@@ -540,7 +538,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
540538
}
541539

542540
// Convert the lists of wants into a Bitswap message
543-
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
541+
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
544542
mq.wllock.Lock()
545543
defer mq.wllock.Unlock()
546544

@@ -567,7 +565,6 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
567565
}
568566

569567
// Add each regular want-have / want-block to the message
570-
peerSent := peerEntries[:0]
571568
for _, e := range peerEntries {
572569
if msgSize >= mq.maxMessageSize {
573570
break
@@ -579,12 +576,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
579576
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
580577
} else {
581578
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
582-
peerSent = append(peerSent, e)
579+
580+
// Move the key from pending to sent
581+
mq.peerWants.MarkSent(e)
583582
}
584583
}
585584

586585
// Add each broadcast want-have to the message
587-
bcstSent := bcstEntries[:0]
588586
for _, e := range bcstEntries {
589587
if msgSize >= mq.maxMessageSize {
590588
break
@@ -600,89 +598,27 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
600598
}
601599

602600
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
603-
bcstSent = append(bcstSent, e)
604-
}
605601

606-
// Called when the message has been successfully sent.
607-
onMessageSent := func() {
608-
mq.wllock.Lock()
609-
defer mq.wllock.Unlock()
610-
611-
// Move the keys from pending to sent
612-
for _, e := range bcstSent {
613-
mq.bcstWants.MarkSent(e)
614-
}
615-
for _, e := range peerSent {
616-
mq.peerWants.MarkSent(e)
617-
}
602+
// Move the key from pending to sent
603+
mq.bcstWants.MarkSent(e)
618604
}
619605

620-
return mq.msg, onMessageSent
621-
}
622-
623-
func (mq *MessageQueue) initializeSender() error {
624-
if mq.sender != nil {
625-
return nil
626-
}
627-
nsender, err := openSender(mq.ctx, mq.network, mq.p)
628-
if err != nil {
629-
return err
630-
}
631-
mq.sender = nsender
632-
return nil
606+
return mq.msg
633607
}
634608

635-
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
636-
err := mq.sender.SendMsg(mq.ctx, message)
637-
if err == nil {
638-
return true
639-
}
640-
641-
log.Infof("bitswap send error: %s", err)
642-
_ = mq.sender.Reset()
643-
mq.sender = nil
644-
645-
select {
646-
case <-mq.done:
647-
return true
648-
case <-mq.ctx.Done():
649-
return true
650-
case <-time.After(mq.sendErrorBackoff):
651-
// wait 100ms in case disconnect notifications are still propagating
652-
log.Warn("SendMsg errored but neither 'done' nor context.Done() were set")
653-
}
654-
655-
err = mq.initializeSender()
656-
if err != nil {
657-
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
658-
return true
659-
}
660-
661-
// TODO: Is this the same instance for the remote peer?
662-
// If its not, we should resend our entire wantlist to them
663-
/*
664-
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
665-
wlm = mq.getFullWantlistMessage()
609+
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
610+
if mq.sender == nil {
611+
opts := &bsnet.MessageSenderOpts{
612+
MaxRetries: maxRetries,
613+
SendTimeout: sendTimeout,
614+
SendErrorBackoff: sendErrorBackoff,
615+
}
616+
nsender, err := mq.network.NewMessageSender(mq.ctx, mq.p, opts)
617+
if err != nil {
618+
return nil, err
666619
}
667-
*/
668-
return false
669-
}
670-
671-
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
672-
// allow ten minutes for connections this includes looking them up in the
673-
// dht dialing them, and handshaking
674-
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
675-
defer cancel()
676-
677-
err := network.ConnectTo(conctx, p)
678-
if err != nil {
679-
return nil, err
680-
}
681620

682-
nsender, err := network.NewMessageSender(ctx, p)
683-
if err != nil {
684-
return nil, err
621+
mq.sender = nsender
685622
}
686-
687-
return nsender, nil
623+
return mq.sender, nil
688624
}

0 commit comments

Comments
 (0)