Skip to content

Commit 7233923

Browse files
authored
Merge pull request #572 from alanmcgovern/fix-tracker-announce-concurrency
[core] Refactor how announce requests are handled.
2 parents 420cb93 + b78a360 commit 7233923

File tree

3 files changed

+129
-52
lines changed

3 files changed

+129
-52
lines changed

src/MonoTorrent.Client/MonoTorrent.Client/Managers/TrackerManager.cs

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ namespace MonoTorrent.Trackers
4444
/// </summary>
4545
class TrackerManager : ITrackerManager
4646
{
47+
static readonly int MaxConcurrentAnnounces = 15;
48+
4749
public event EventHandler<AnnounceResponseEventArgs>? AnnounceComplete;
4850
public event EventHandler<ScrapeResponseEventArgs>? ScrapeComplete;
4951

50-
public SemaphoreSlim AnnounceLimiter { get; }
52+
ReusableExclusiveSemaphore AnnounceLimiter { get; }
5153

5254
Factories Factories { get; }
5355

@@ -77,7 +79,7 @@ class TrackerManager : ITrackerManager
7779
/// <param name="isPrivate">True if adding/removing tracker should be disallowed.</param>
7880
internal TrackerManager (Factories factories, ITrackerRequestFactory requestFactory, IEnumerable<IEnumerable<string>> announces, bool isPrivate)
7981
{
80-
AnnounceLimiter = new SemaphoreSlim (10);
82+
AnnounceLimiter = new ReusableExclusiveSemaphore ();
8183
Factories = factories;
8284
RequestFactory = requestFactory;
8385
Private = isPrivate;
@@ -154,49 +156,64 @@ public async ReusableTask<bool> RemoveTrackerAsync (ITracker tracker)
154156
public ReusableTask AnnounceAsync (CancellationToken token)
155157
=> AnnounceAsync (TorrentEvent.None, token);
156158

157-
public async ReusableTask AnnounceAsync (TorrentEvent clientEvent, CancellationToken token)
158-
{
159-
// If the user initiates an Announce we need to go to the correct thread to process it.
160-
await ClientEngine.MainLoop;
159+
public ReusableTask AnnounceAsync (TorrentEvent clientEvent, CancellationToken token)
160+
=> AnnounceAsync (clientEvent, null, token);
161161

162-
var args = RequestFactory.CreateAnnounce (clientEvent);
163-
var announces = new List<Task> ();
164-
for (int i = 0; i < Tiers.Count; i++) {
165-
var task = AnnounceTierAsync (Tiers[i], args, token);
166-
if (task.IsCompleted)
167-
await task;
168-
else
169-
announces.Add (task.AsTask ());
170-
}
162+
public ReusableTask AnnounceAsync (ITracker tracker, CancellationToken token)
163+
=> AnnounceAsync (TorrentEvent.None, tracker ?? throw new ArgumentNullException (nameof (tracker)), token);
171164

172-
if (announces.Count > 0)
173-
await Task.WhenAll (announces);
174-
}
175-
async ReusableTask AnnounceTierAsync (TrackerTier tier, AnnounceRequest args, CancellationToken token)
165+
async ReusableTask AnnounceAsync (TorrentEvent clientEvent, ITracker? tracker, CancellationToken token)
176166
{
177-
using (await AnnounceLimiter.EnterAsync ())
178-
await tier.AnnounceAsync (args, token);
179-
}
180-
181-
public async ReusableTask AnnounceAsync (ITracker tracker, CancellationToken token)
182-
{
183-
Check.Tracker (tracker);
184-
185167
// If the user initiates an Announce we need to go to the correct thread to process it.
186168
await ClientEngine.MainLoop;
187169

188-
try {
170+
// Check if there are any in-progress announce requests being handled.
171+
if (!AnnounceLimiter.TryEnter (out ReusableExclusiveSemaphore.Releaser releaser)) {
172+
// There is an in-progress announce, and this is a regular recurring announce attempt, so just bail out.
173+
if (tracker is null && clientEvent == TorrentEvent.None)
174+
return;
175+
176+
// If we get here it means there's an in-progress announce *and* this is a special event. Either the user
177+
// has announced to a specific tracker, or we have a special 'TorrentEvent' to announce to the tracker.
178+
// Wait for the in-progress announce to complete, then run this one!
179+
releaser = await AnnounceLimiter.EnterAsync ();
180+
}
181+
182+
using var autorelease = releaser;
183+
token.ThrowIfCancellationRequested ();
184+
185+
// This is either a regularly scheduled announce, or a specific TorrentEvent is being sent to the tracker.
186+
if (tracker is null) {
187+
// Create and re-use the announce args for all trackers and tiers
188+
var args = RequestFactory.CreateAnnounce (clientEvent);
189+
190+
// Capture a list of Tiers in case the user adds/removes any mid-announce.
191+
var pendingTiers = new Queue<TrackerTier> (Tiers);
192+
var activeAnnounces = new List<Task> ();
193+
194+
while (pendingTiers.Count > 0) {
195+
if (activeAnnounces.Count == MaxConcurrentAnnounces) {
196+
var completed = await Task.WhenAny (activeAnnounces);
197+
activeAnnounces.Remove (completed);
198+
await completed;
199+
}
200+
201+
// The announce *might* fast-path and exit immediately if that
202+
// tracker has been announced to recently.
203+
var task = pendingTiers.Dequeue ().AnnounceAsync (args, token);
204+
if (task.IsCompleted)
205+
await task;
206+
else
207+
activeAnnounces.Add (task.AsTask ());
208+
}
209+
await Task.WhenAll (activeAnnounces);
210+
} else {
211+
// This only occurs when the user has manually announced to a specific tracker.
189212
var trackerTier = Tiers.First (t => t.Trackers.Contains (tracker));
190213
AnnounceRequest args = RequestFactory.CreateAnnounce (TorrentEvent.None);
191-
await AnnounceTrackerAsync (trackerTier, args, tracker, token);
192-
} catch {
214+
await trackerTier.AnnounceAsync (args, tracker, token);
193215
}
194216
}
195-
async ReusableTask AnnounceTrackerAsync (TrackerTier tier, AnnounceRequest args, ITracker tracker, CancellationToken token)
196-
{
197-
using (await AnnounceLimiter.EnterAsync ())
198-
await tier.AnnounceAsync (args, tracker, token);
199-
}
200217

201218
public async ReusableTask ScrapeAsync (CancellationToken token)
202219
{

src/Tests/Tests.MonoTorrent.Client/MonoTorrent.Client/TestRig.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public ReusableTask SetMaximumOpenFilesAsync (int maximumOpenFiles)
121121
}
122122
}
123123

124-
class CustomTracker : ITrackerConnection
124+
class CustomTrackerConnection : ITrackerConnection
125125
{
126126
public List<DateTime> AnnouncedAt = new List<DateTime> ();
127127
public List<AnnounceRequest> AnnounceParameters = new List<AnnounceRequest> ();
@@ -135,7 +135,7 @@ class CustomTracker : ITrackerConnection
135135

136136
readonly List<Peer> peers = new List<Peer> ();
137137

138-
public CustomTracker (Uri uri)
138+
public CustomTrackerConnection (Uri uri)
139139
{
140140
Uri = uri;
141141
}
@@ -419,7 +419,7 @@ public async Task RecreateManager ()
419419
.WithDhtListenerCreator (port => new NullDhtListener ())
420420
.WithLocalPeerDiscoveryCreator (() => new ManualLocalPeerListener ())
421421
.WithPeerConnectionListenerCreator (endpoint => new CustomListener ())
422-
.WithTrackerCreator ("custom", uri => new Tracker (new CustomTracker (uri)))
422+
.WithTrackerCreator ("custom", uri => new Tracker (new CustomTrackerConnection (uri)))
423423
;
424424

425425
Engine = new ClientEngine (EngineSettingsBuilder.CreateForTests (

src/Tests/Tests.MonoTorrent.Client/MonoTorrent.Client/TrackerManagerTests.cs

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,35 +43,54 @@
4343

4444
namespace MonoTorrent.Client
4545
{
46-
class TrackerWithConnection : Tracker
46+
class CustomTracker : Tracker
4747
{
48-
public CustomTracker Connection { get; }
48+
public CustomTrackerConnection Connection { get; }
4949

50-
public TrackerWithConnection (ITrackerConnection connection)
50+
public CustomTracker (CustomTrackerConnection connection)
5151
: base (connection)
5252
{
53-
Connection = (CustomTracker) connection;
53+
Connection = connection;
5454
}
5555
}
56-
class DefaultTracker : ITrackerConnection
56+
57+
class RateLimitingTracker : Tracker
58+
{
59+
public RateLimitingTrackerConnection Connection { get; }
60+
61+
public RateLimitingTracker (RateLimitingTrackerConnection connection)
62+
: base (connection)
63+
{
64+
Connection = (RateLimitingTrackerConnection) connection;
65+
}
66+
}
67+
68+
class RateLimitingTrackerConnection : ITrackerConnection
5769
{
5870
public bool CanScrape => true;
5971

6072
public Uri Uri => new Uri ("http://tracker:5353/announce");
6173

62-
public DefaultTracker ()
74+
public List<TaskCompletionSource<TrackerState>> PendingAnnounces { get; } = new List<TaskCompletionSource<TrackerState>> ();
75+
public List<TaskCompletionSource<TrackerState>> PendingScrapes { get; } = new List<TaskCompletionSource<TrackerState>> ();
76+
77+
public RateLimitingTrackerConnection ()
6378
{
6479

6580
}
6681

67-
public ReusableTask<AnnounceResponse> AnnounceAsync (AnnounceRequest parameters, CancellationToken token)
82+
public async ReusableTask<AnnounceResponse> AnnounceAsync (AnnounceRequest parameters, CancellationToken token)
6883
{
69-
return ReusableTask.FromResult (new AnnounceResponse (TrackerState.Ok));
84+
var tcs = new TaskCompletionSource<TrackerState> ();
85+
PendingAnnounces.Add (tcs);
86+
return new AnnounceResponse (await tcs.Task);
7087
}
7188

72-
public ReusableTask<ScrapeResponse> ScrapeAsync (ScrapeRequest parameters, CancellationToken token)
89+
public async ReusableTask<ScrapeResponse> ScrapeAsync (ScrapeRequest parameters, CancellationToken token)
7390
{
74-
return ReusableTask.FromResult (new ScrapeResponse (TrackerState.Ok));
91+
var tcs = new TaskCompletionSource<TrackerState> ();
92+
PendingScrapes.Add (tcs);
93+
return new ScrapeResponse (await tcs.Task);
7594
}
7695
}
7796

@@ -110,17 +129,17 @@ public ScrapeRequest CreateScrape ()
110129
};
111130

112131
TrackerManager trackerManager;
113-
IList<List<TrackerWithConnection>> trackers;
132+
IList<List<CustomTracker>> trackers;
114133

115134
Factories Factories { get; set; }
116135

117136
[SetUp]
118137
public void Setup ()
119138
{
120139
Factories = Factories.Default
121-
.WithTrackerCreator ("custom", uri => new TrackerWithConnection (new CustomTracker (uri)));
140+
.WithTrackerCreator ("custom", uri => new CustomTracker (new CustomTrackerConnection (uri)));
122141
trackerManager = new TrackerManager (Factories, new RequestFactory (), trackerUrls, true);
123-
trackers = trackerManager.Tiers.Select (t => t.Trackers.Cast<TrackerWithConnection> ().ToList ()).ToList ();
142+
trackers = trackerManager.Tiers.Select (t => t.Trackers.Cast<CustomTracker> ().ToList ()).ToList ();
124143
}
125144

126145
[Test]
@@ -231,6 +250,47 @@ public async Task AnnounceFailed ()
231250
Assert.AreEqual (0, trackers[1][1].Connection.AnnouncedAt.Count, "#8");
232251
}
233252

253+
[Test]
254+
public async Task Announce_RateLimitedAnnounceAttempts ()
255+
{
256+
var factories = Factories.Default
257+
.WithTrackerCreator ("custom", uri => new RateLimitingTracker (new RateLimitingTrackerConnection ()));
258+
259+
var tier = new[] { new[] { $"custom://tracker/announce" } };
260+
var trackerManager = new TrackerManager (factories, new RequestFactory (), tier, true);
261+
var trackers = trackerManager.Tiers.Select (t => t.Trackers.Cast<RateLimitingTracker> ().ToList ()).ToList ();
262+
263+
// only 1 concurrent regular announce can run at a time.
264+
var announce = trackerManager.AnnounceAsync (CancellationToken.None);
265+
266+
// These should all early-exit
267+
for (int i = 0; i < 3; i++)
268+
await trackerManager.AnnounceAsync (CancellationToken.None).WithTimeout (TimeSpan.FromSeconds (10));
269+
for (int i = 0; i < 3; i++)
270+
await trackerManager.AnnounceAsync (TorrentEvent.None, CancellationToken.None).WithTimeout (TimeSpan.FromSeconds (10));
271+
272+
Assert.IsFalse (announce.IsCompleted);
273+
}
274+
275+
[Test]
276+
public void Announce_RateLimitedTierAnnounces ()
277+
{
278+
var factories = Factories.Default
279+
.WithTrackerCreator ("custom", uri => new RateLimitingTracker (new RateLimitingTrackerConnection ()));
280+
281+
// Create 100 tracker tiers.
282+
var urls = Enumerable.Range (0, 100).Select (t => new[] { $"custom://tracker{t}/announce" }).ToArray ();
283+
var trackerManager = new TrackerManager (factories, new RequestFactory (), urls, true);
284+
var trackers = trackerManager.Tiers.Select (t => t.Trackers.Cast<RateLimitingTracker> ().ToList ()).ToList ();
285+
286+
var cts = new CancellationTokenSource (TimeSpan.FromSeconds (10));
287+
var announce = trackerManager.AnnounceAsync (CancellationToken.None);
288+
while (trackers.SelectMany (t => t).Where (t => t.Connection.PendingAnnounces.Count == 1).Count () != 15) {
289+
Thread.Sleep (1);
290+
cts.Token.ThrowIfCancellationRequested ();
291+
}
292+
}
293+
234294
[Test]
235295
public async Task CurrentTracker ()
236296
{
@@ -273,7 +333,7 @@ public async Task AnnounceTwice_SendStartedOnce ()
273333
[Test]
274334
public void Defaults ()
275335
{
276-
var tracker = new Tracker (new DefaultTracker ());
336+
var tracker = new CustomTracker (new CustomTrackerConnection (new Uri("http://tester/announce")));
277337
Assert.AreEqual (TimeSpan.FromMinutes (3), tracker.MinUpdateInterval, "#1");
278338
Assert.AreEqual (TimeSpan.FromMinutes (30), tracker.UpdateInterval, "#2");
279339
Assert.IsNotNull (tracker.WarningMessage, "#3");

0 commit comments

Comments
 (0)