From dc319770a649801e988928445e7d98a3464820b4 Mon Sep 17 00:00:00 2001 From: Alan McGovern Date: Thu, 3 Nov 2022 23:24:16 +0000 Subject: [PATCH 1/2] [core] Ensure timeouts apply for MetadataMode and PieceHashesMode If a block is requested from a peer when fetching metadata, or when fetching the piece hashes of a bittorrent v2 torrent, we need to ensure the '45 second' timeout is applied to these requests, just like they are applied for regular piece requests. Additionally, whenever a request is enqueued with a peer we should reset the 'LastBlockReceived' timer so that we do not unfairly disconnect a peer for not fulfilling a request within the timeout period simply because we have *literally* just requested the block. --- .../MonoTorrent.Client.Modes/MetadataMode.cs | 7 +++++++ .../MonoTorrent.Client.Modes/Mode.cs | 21 +++++++++++++------ .../PieceHashesMode.cs | 6 ++++++ .../Managers/TorrentManager.cs | 1 + 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs index 166d325d9..9e6701ebe 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs @@ -94,6 +94,7 @@ void IMessageEnqueuer.EnqueueRequest (IRequester wrappedPeer, PieceSegment block { var peer = UnwrappedPeers[(IgnoringChokeStateRequester) wrappedPeer]; var message = new LTMetadata (peer.ExtensionSupports, LTMetadata.MessageType.Request, block.BlockIndex); + peer.LastBlockReceived.Restart (); peer.MessageQueue.Enqueue (message); } @@ -156,6 +157,8 @@ public override void Tick (int counter) foreach (PeerId id in Manager.Peers.ConnectedPeers) if (id.SupportsLTMessages && id.ExtensionSupports.Supports (LTMetadata.Support.Name)) RequestNextNeededPiece (id); + + CloseConnectionsForStalePeers (); } async void SendAnnounces () @@ -183,6 +186,10 @@ protected override void HandleLtMetadataMessage (PeerId id, LTMetadata message) if (!Requester.ValidatePiece (RequesterData.Wrap(id), new PieceSegment (0, message.Piece), out bool pieceComplete, new HashSet ())) return; + // If the piece validated correctly we should indicate that this peer is healthy and is providing the data + // we requested + id.LastBlockReceived.Restart (); + message.MetadataPiece.CopyTo (Stream.AsMemory (message.Piece * LTMetadata.BlockSize)); if (pieceComplete) { InfoHash? v1InfoHash; diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs index 05fd03a78..00d3e7bef 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs @@ -654,14 +654,10 @@ void PostLogicTick (int counter) i--; continue; } - - if (id.LastBlockReceived.Elapsed > Settings.StaleRequestTimeout && id.AmRequestingPiecesCount > 0) { - ConnectionManager.CleanupSocket (Manager, id); - i--; - continue; - } } + CloseConnectionsForStalePeers (); + Manager.PieceManager.AddPieceRequests (Manager.Peers.ConnectedPeers); if (Manager.State == TorrentState.Seeding || Manager.State == TorrentState.Downloading) { @@ -669,6 +665,19 @@ void PostLogicTick (int counter) } } + protected internal void CloseConnectionsForStalePeers () + { + for (int i = 0; i < Manager.Peers.ConnectedPeers.Count; i++) { + var id = Manager.Peers.ConnectedPeers[i]; + + if (id.LastBlockReceived.Elapsed > Settings.StaleRequestTimeout && id.AmRequestingPiecesCount > 0) { + ConnectionManager.CleanupSocket (Manager, id); + i--; + continue; + } + } + } + void DownloadLogic (int counter) { if (ClientEngine.SupportsWebSeed && (DateTime.Now - Manager.StartTime) > Settings.WebSeedDelay && Manager.Monitor.DownloadRate < Settings.WebSeedSpeedTrigger) { diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs index ba3427add..32db7f0fd 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs @@ -96,6 +96,7 @@ void IMessageEnqueuer.EnqueueRequest (IRequester wrappedPeer, PieceSegment block { var peer = UnwrappedPeers[(IgnoringChokeStateRequester) wrappedPeer]; var message = HashRequestMessage.Create (root, totalHashes, actualPieceLength, block.PieceIndex * PieceLength, MaxHashesPerRequest); + peer.LastBlockReceived.Restart (); peer.MessageQueue.Enqueue (message); } @@ -147,6 +148,7 @@ public override void Tick (int counter) { MaybeAnnounce (); MaybeRequestNext (); + CloseConnectionsForStalePeers (); } async void MaybeAnnounce () @@ -220,6 +222,10 @@ protected override void HandleHashesMessage (PeerId id, HashesMessage hashesMess return; } + // If the piece validated correctly we should indicate that this peer is healthy and is providing the data + // we requested + id.LastBlockReceived.Restart (); + // NOTE: Tweak this so we validate the hash in-place, and ensure the data we've been given, provided with the ancestor // hashes, combines to form the `PiecesRoot` value. var success = infoHashes[file].TryAppend (hashesMessage.BaseLayer, hashesMessage.Index, hashesMessage.Length, hashesMessage.Hashes.Span.Slice (0, hashesMessage.Length * 32), hashesMessage.Hashes.Span.Slice (hashesMessage.Length * 32)); diff --git a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs index ce7bb5c7e..d524398ce 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs @@ -1191,6 +1191,7 @@ void IMessageEnqueuer.EnqueueRequests (IRequester peer, Span segme { (var bundle, var releaser) = RequestBundle.Rent (); bundle.Initialize (segments.ToBlockInfo (stackalloc BlockInfo[segments.Length], this)); + ((PeerId) peer).LastBlockReceived.Restart (); ((PeerId) peer).MessageQueue.Enqueue (bundle, releaser); } From 145b5df1cf63679d06e835e829b76ca91c54c62e Mon Sep 17 00:00:00 2001 From: Alan McGovern Date: Fri, 4 Nov 2022 00:33:33 +0000 Subject: [PATCH 2/2] [core] Simplify (re)setting LastBlockReceived This should be accurately tracked in the 3 places it's used. --- .../MonoTorrent.Client.Modes/MetadataMode.cs | 7 +++++-- .../MonoTorrent.Client.Modes/Mode.cs | 15 +++++++++++---- .../MonoTorrent.Client.Modes/PieceHashesMode.cs | 7 +++++-- .../Managers/ConnectionManager.cs | 4 ++-- .../MonoTorrent.Client/Managers/PieceManager.cs | 8 +++++++- .../MonoTorrent.Client/Managers/TorrentManager.cs | 1 - 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs index 9e6701ebe..2acd10ffc 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs @@ -94,7 +94,6 @@ void IMessageEnqueuer.EnqueueRequest (IRequester wrappedPeer, PieceSegment block { var peer = UnwrappedPeers[(IgnoringChokeStateRequester) wrappedPeer]; var message = new LTMetadata (peer.ExtensionSupports, LTMetadata.MessageType.Request, block.BlockIndex); - peer.LastBlockReceived.Restart (); peer.MessageQueue.Enqueue (message); } @@ -188,7 +187,11 @@ protected override void HandleLtMetadataMessage (PeerId id, LTMetadata message) // If the piece validated correctly we should indicate that this peer is healthy and is providing the data // we requested - id.LastBlockReceived.Restart (); + if (id.AmRequestingPiecesCount == 0) { + id.LastBlockReceived.Reset (); + } else { + id.LastBlockReceived.Restart (); + } message.MetadataPiece.CopyTo (Stream.AsMemory (message.Piece * LTMetadata.BlockSize)); if (pieceComplete) { diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs index 00d3e7bef..8ddecf7e5 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs @@ -670,10 +670,17 @@ protected internal void CloseConnectionsForStalePeers () for (int i = 0; i < Manager.Peers.ConnectedPeers.Count; i++) { var id = Manager.Peers.ConnectedPeers[i]; - if (id.LastBlockReceived.Elapsed > Settings.StaleRequestTimeout && id.AmRequestingPiecesCount > 0) { - ConnectionManager.CleanupSocket (Manager, id); - i--; - continue; + if (id.AmRequestingPiecesCount > 0) { + if (!id.LastBlockReceived.IsRunning) + id.LastBlockReceived.Restart (); + + if (id.LastBlockReceived.Elapsed > Settings.StaleRequestTimeout) { + ConnectionManager.CleanupSocket (Manager, id); + i--; + continue; + } + } else { + id.LastBlockReceived.Reset (); } } } diff --git a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs index 32db7f0fd..a40727e92 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client.Modes/PieceHashesMode.cs @@ -96,7 +96,6 @@ void IMessageEnqueuer.EnqueueRequest (IRequester wrappedPeer, PieceSegment block { var peer = UnwrappedPeers[(IgnoringChokeStateRequester) wrappedPeer]; var message = HashRequestMessage.Create (root, totalHashes, actualPieceLength, block.PieceIndex * PieceLength, MaxHashesPerRequest); - peer.LastBlockReceived.Restart (); peer.MessageQueue.Enqueue (message); } @@ -224,7 +223,11 @@ protected override void HandleHashesMessage (PeerId id, HashesMessage hashesMess // If the piece validated correctly we should indicate that this peer is healthy and is providing the data // we requested - id.LastBlockReceived.Restart (); + if (id.AmRequestingPiecesCount == 0) { + id.LastBlockReceived.Reset (); + } else { + id.LastBlockReceived.Restart (); + } // NOTE: Tweak this so we validate the hash in-place, and ensure the data we've been given, provided with the ancestor // hashes, combines to form the `PiecesRoot` value. diff --git a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/ConnectionManager.cs b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/ConnectionManager.cs index 0b63de664..7649ffc57 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/ConnectionManager.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/ConnectionManager.cs @@ -254,7 +254,7 @@ internal async void ProcessNewOutgoingConnection (TorrentManager manager, PeerId ReceiveMessagesAsync (id.Connection, id.Decryptor, manager.DownloadLimiters, id.Monitor, manager, id); id.WhenConnected.Restart (); - id.LastBlockReceived.Restart (); + id.LastBlockReceived.Reset (); } catch { manager.RaiseConnectionAttemptFailed (new ConnectionAttemptFailedEventArgs (id.Peer.Info, ConnectionFailureReason.Unknown, manager)); CleanupSocket (manager, id); @@ -419,7 +419,7 @@ internal async ReusableTask IncomingConnectionAcceptedAsync (TorrentManage id.WhenConnected.Restart (); // Baseline the time the last block was received - id.LastBlockReceived.Restart (); + id.LastBlockReceived.Reset (); // Send our handshake now that we've decided to keep the connection var handshake = new HandshakeMessage (id.ExpectedInfoHash.Truncate (), manager.Engine!.PeerId, Constants.ProtocolStringV100); diff --git a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/PieceManager.cs b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/PieceManager.cs index 8cbd14528..259cffa89 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/PieceManager.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/PieceManager.cs @@ -69,7 +69,13 @@ internal bool PieceDataReceived (PeerId id, PieceMessage message, out bool piece //FIXME: Ensure piece length is correct? var isValidLength = Manager.Torrent!.BytesPerBlock (message.PieceIndex, message.StartOffset / Constants.BlockSize) == message.RequestLength; if (Initialised && isValidLength && Requester.ValidatePiece (id, new PieceSegment (message.PieceIndex, message.StartOffset / Constants.BlockSize), out pieceComplete, peersInvolved)) { - id.LastBlockReceived.Restart (); + // If the piece validated correctly we should indicate that this peer is healthy and is providing the data + // we requested + if (id.AmRequestingPiecesCount == 0) { + id.LastBlockReceived.Reset (); + } else { + id.LastBlockReceived.Restart (); + } if (pieceComplete) PendingHashCheckPieces[message.PieceIndex] = true; return true; diff --git a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs index d524398ce..ce7bb5c7e 100644 --- a/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs +++ b/src/MonoTorrent.Client/MonoTorrent.Client/Managers/TorrentManager.cs @@ -1191,7 +1191,6 @@ void IMessageEnqueuer.EnqueueRequests (IRequester peer, Span segme { (var bundle, var releaser) = RequestBundle.Rent (); bundle.Initialize (segments.ToBlockInfo (stackalloc BlockInfo[segments.Length], this)); - ((PeerId) peer).LastBlockReceived.Restart (); ((PeerId) peer).MessageQueue.Enqueue (bundle, releaser); }