Skip to content

Commit

Permalink
[core] Tweak ratelimiting a little better for low rates (#618)
Browse files Browse the repository at this point in the history
* [core] Tweak ratelimiting a little better for low rates

Fix another issue with a ratelimit of 1 byte/second. Hopefully
no-one was actually trying to use a limit this low.

Additionally, simplify this logic now. Rate limits are much
easier than they used to be as the engine works in bytes, not
'blocks' or 'chunks'.

Back in the day there had to be estimates made for the amount
of data which would be received for a given 'ReceiveAsync' call.
Now we're just counting the raw number of bytes sent/received,
or read/written, so we don't need complicated math to account
for over/under-shoot.
  • Loading branch information
alanmcgovern authored Jan 31, 2023
1 parent 629ddcf commit a8c7a6c
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ namespace MonoTorrent.Client.RateLimiters
{
interface IRateLimiter
{
/// <summary>
/// When this returns <see langword="null"/> there is no preference on
/// how large each chunk of work should be. Otherwise, work should be processed
/// in chunks of this size.
/// </summary>
int? PreferredChunkSize { get; }

/// <summary>
/// Returns true if there is sufficient capacity left in the rate limiter to
/// process the specified amount of data. Also returns true if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,26 @@
//


using System;
using System.Threading;

namespace MonoTorrent.Client.RateLimiters
{
sealed class RateLimiter : IRateLimiter
{
long savedError;
long chunks;

public int? PreferredChunkSize { get; private set; }

public bool Unlimited { get; set; }

public RateLimiter ()
{
UpdateChunks (0, 0, null);
UpdateChunks (0);
}

public void UpdateChunks (long maxRate, long actualRate, int? preferredChunkSize)
public void UpdateChunks (long maxRate)
{
PreferredChunkSize = preferredChunkSize;
Unlimited = maxRate == 0;
if (Unlimited)
return;

// From experimentation, i found that increasing by 5% gives more accuate rate limiting
// for peer communications. For disk access and whatnot, a 5% overshoot is fine.
maxRate = (long) (maxRate * 1.05);
long errorRateDown = maxRate - actualRate;
long delta = (long) (0.4 * errorRateDown + 0.6 * savedError);
savedError = errorRateDown;

long increaseAmount = maxRate + delta;
Interlocked.Add (ref chunks, increaseAmount);
if (chunks > (maxRate * 1.2))
Interlocked.Exchange (ref chunks, (int) (maxRate * 1.2));

if (chunks < (maxRate / 2))
Interlocked.Exchange (ref chunks, (maxRate / 2));

if (maxRate == 0)
chunks = 0;
Interlocked.Exchange(ref chunks, maxRate);
}

public bool TryProcess (long amount)
Expand All @@ -79,7 +57,7 @@ public bool TryProcess (long amount)
long c;
do {
c = Interlocked.Read (ref chunks);
if (c < 0)
if (c <= 0)
return false;

} while (Interlocked.CompareExchange (ref chunks, c - amount, c) != c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ sealed class RateLimiterGroup : IRateLimiter, IEnumerable<IRateLimiter>
{
readonly List<IRateLimiter> limiters;

public int? PreferredChunkSize {
get {
int? preferredChunkSize = null;
for (int i = 0; i < limiters.Count; i++)
if (limiters[i].PreferredChunkSize.HasValue)
preferredChunkSize = preferredChunkSize.HasValue ? Math.Min (limiters[i].PreferredChunkSize!.Value, preferredChunkSize.Value) : limiters[i].PreferredChunkSize!.Value;
return preferredChunkSize;
}
}

public bool Unlimited {
get {
for (int i = 0; i < limiters.Count; i++)
Expand Down
28 changes: 2 additions & 26 deletions src/MonoTorrent.Client/MonoTorrent.Client/ClientEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -773,37 +773,13 @@ public async Task StopAllAsync (TimeSpan timeout)

#region Private/Internal methods


internal static int? PreferredChunkSize (int maxSpeedEngine, int maxSpeedTorrent)
{
// Unlimited
if (maxSpeedEngine == 0 && maxSpeedTorrent == 0)
return null;

int maxSpeed;
if (maxSpeedEngine != 0 && maxSpeedTorrent != 0)
maxSpeed = Math.Min (maxSpeedEngine, maxSpeedTorrent);
else
maxSpeed = Math.Max (maxSpeedEngine, maxSpeedTorrent);

// The max we transmit for a single socket call is 16kB as that is the size of a
// single block. If the transfer rate is unlimited, or we can transfer greater
// than 256kB/sec then continue using 'unlimited' sized chunks. Otherwise restrict
// individual calls to 4kB to try and keep things reasonably evenly distributed.
if (maxSpeed == 0 || maxSpeed > 16 * 16 * 1024)
return null;

// If the limit is below 256 kB/sec then we can communicate in 4kB chunks
return 4096 + 32;
}

void LogicTick ()
{
tickCount++;

if (tickCount % 2 == 0) {
downloadLimiter.UpdateChunks (Settings.MaximumDownloadRate, TotalDownloadRate, PreferredChunkSize (Settings.MaximumDownloadRate, 0));
uploadLimiter.UpdateChunks (Settings.MaximumUploadRate, TotalUploadRate, PreferredChunkSize (Settings.MaximumUploadRate, 0));
downloadLimiter.UpdateChunks (Settings.MaximumDownloadRate);
uploadLimiter.UpdateChunks (Settings.MaximumUploadRate);
}

ConnectionManager.CancelPendingConnects ();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,8 @@ ReusableTask Tick (int delta, bool waitForBufferedIO)
WriterReadMonitor.Tick (delta);
WriterWriteMonitor.Tick (delta);

WriteLimiter.UpdateChunks (Settings.MaximumDiskWriteRate, WriteRate, null);
ReadLimiter.UpdateChunks (Settings.MaximumDiskReadRate, ReadRate, null);
WriteLimiter.UpdateChunks (Settings.MaximumDiskWriteRate);
ReadLimiter.UpdateChunks (Settings.MaximumDiskReadRate);

ReusableTask processTask = ProcessBufferedIOAsync ();
return waitForBufferedIO ? processTask : ReusableTask.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public async Task MoveFileAsync (ITorrentManagerFile file, string path)
throw new TorrentException ("Cannot move files when the torrent is active");

try {
var paths = TorrentFileInfo.GetNewPaths (Path.GetFullPath (path), Engine.Settings.UsePartialFiles, file.Path == file.DownloadCompleteFullPath);
var paths = TorrentFileInfo.GetNewPaths (Path.GetFullPath (path), Engine!.Settings.UsePartialFiles, file.Path == file.DownloadCompleteFullPath);
await Engine!.DiskManager.MoveFileAsync (file, paths);
} catch (Exception ex) {
TrySetError (Reason.WriteFailure, ex);
Expand Down Expand Up @@ -1034,8 +1034,8 @@ internal void RaiseConnectionAttemptFailed (ConnectionAttemptFailedEventArgs arg
internal void UpdateLimiters ()
{
if (Engine != null) {
DownloadLimiter.UpdateChunks (Settings.MaximumDownloadRate, Monitor.DownloadRate, ClientEngine.PreferredChunkSize (Engine.Settings.MaximumDownloadRate, Settings.MaximumDownloadRate));
UploadLimiter.UpdateChunks (Settings.MaximumUploadRate, Monitor.UploadRate, ClientEngine.PreferredChunkSize (Engine.Settings.MaximumUploadRate, Settings.MaximumUploadRate));
DownloadLimiter.UpdateChunks (Settings.MaximumDownloadRate);
UploadLimiter.UpdateChunks (Settings.MaximumUploadRate);
}
}
#endregion Internal Methods
Expand Down
12 changes: 5 additions & 7 deletions src/MonoTorrent.Client/MonoTorrent.Client/NetworkIO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,14 @@ public static async ReusableTask ReceiveAsync (IPeerConnection connection, Memor
while (buffer.Length > 0) {
int transferred;
bool unlimited = rateLimiter?.Unlimited ?? true;
int shouldRead = unlimited || !rateLimiter!.PreferredChunkSize.HasValue ? buffer.Length : Math.Min (rateLimiter.PreferredChunkSize.Value, buffer.Length);

if (rateLimiter != null && !unlimited && !rateLimiter.TryProcess (shouldRead)) {
if (rateLimiter != null && !unlimited && !rateLimiter.TryProcess (buffer.Length)) {
var tcs = new ReusableTaskCompletionSource<int> ();
lock (receiveQueue)
receiveQueue.Enqueue (new QueuedIO (connection, buffer, rateLimiter, tcs));
transferred = await tcs.Task.ConfigureAwait (false);
} else {
transferred = await connection.ReceiveAsync (buffer.Slice (0, shouldRead)).ConfigureAwait (false);
transferred = await connection.ReceiveAsync (buffer).ConfigureAwait (false);
}

if (transferred == 0)
Expand All @@ -162,15 +161,14 @@ public static async ReusableTask SendAsync (IPeerConnection connection, Memory<b
while (buffer.Length > 0) {
int transferred;
bool unlimited = rateLimiter?.Unlimited ?? true;
int shouldRead = unlimited || !rateLimiter!.PreferredChunkSize.HasValue ? buffer.Length : Math.Min (rateLimiter.PreferredChunkSize.Value, buffer.Length);

if (rateLimiter != null && !unlimited && !rateLimiter.TryProcess (shouldRead)) {
if (rateLimiter != null && !unlimited && !rateLimiter.TryProcess (buffer.Length)) {
var tcs = new ReusableTaskCompletionSource<int> ();
lock (sendQueue)
sendQueue.Enqueue (new QueuedIO (connection, buffer.Slice (0, shouldRead), rateLimiter, tcs));
sendQueue.Enqueue (new QueuedIO (connection, buffer, rateLimiter, tcs));
transferred = await tcs.Task.ConfigureAwait (false);
} else {
transferred = await connection.SendAsync (buffer.Slice (0, shouldRead)).ConfigureAwait (false);
transferred = await connection.SendAsync (buffer).ConfigureAwait (false);
}

if (transferred == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public List<Peer> GetPeers (AddressFamily addressFamily)
/// <param name="response">The bencoded dictionary to add the peers to</param>
/// <param name="count">The number of peers to add</param>
/// <param name="compact">True if the peers should be in compact form</param>
/// <param name="addressFamily"></param>
internal void GetPeers (BEncodedDictionary response, int count, bool compact, AddressFamily addressFamily)
{
byte[]? compactResponse = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ public bool Add (ITrackable trackable, IPeerComparer comparer)
/// <summary>
/// Adds the trackable to the server
/// </summary>
/// <param name="trackable">The trackable to add</param>
/// <param name="comparer">The comparer used to decide whether two peers are the same.</param>
/// <param name="manager">.</param>
/// <returns></returns>
internal bool Add (SimpleTorrentManager manager)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class RateLimiterTests
public void ChunkSizeLargerThanRateLimit ()
{
var rateLimiter = new RateLimiter ();
rateLimiter.UpdateChunks (10, 10, 10);
rateLimiter.UpdateChunks (10);

// We can process any size chunk as long as there's some rate limit left
Assert.IsTrue (rateLimiter.TryProcess (11));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,29 @@ public async Task ExceedReadRate ()
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskReadRate = 1 }.ToSettings ());
await diskManager.Tick (1000).WithTimeout ();

// Queue up 6 reads, none should process.
// Queue up 7 reads, 1 should process.
var buffer = new byte[Constants.BlockSize];
int count = 6;
var tasks = new List<Task> ();
for (int i = 0; i < count; i++)
for (int i = 0; i < 7 + 1; i++)
tasks.Add (diskManager.ReadAsync (fileData, new BlockInfo (0, 0, buffer.Length), buffer).AsTask ());

Assert.AreEqual (buffer.Length * count, diskManager.PendingReadBytes, "#1");
// Wait for the first task to complete.
var doneTask = await Task.WhenAny (tasks).WithTimeout ();
tasks.Remove (doneTask);
await doneTask;

// We should still process none.
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingReadBytes, "#1");

// This should process one too.
await diskManager.Tick (1000).WithTimeout ();
Assert.AreEqual (buffer.Length * count, diskManager.PendingReadBytes, "#2");
doneTask = await Task.WhenAny (tasks).WithTimeout ();
tasks.Remove (doneTask);
await doneTask;

// Give a proper max read rate.
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskReadRate = Constants.BlockSize * 2 }.ToSettings ());
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingReadBytes, "#2");

// Give a max read rate which allows at least 2 blocks to read.
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskReadRate = (int)(Constants.BlockSize * 1.8) }.ToSettings ());
for (int i = 0; i < 2; i++) {
await diskManager.Tick (1000).WithTimeout ();

Expand All @@ -245,17 +253,15 @@ public async Task ExceedReadRate ()
}
Assert.IsFalse (tasks.Any (t => t.IsCompleted));

count -= 2;
Assert.AreEqual (buffer.Length * count, diskManager.PendingReadBytes, "#3." + i);
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingReadBytes, "#3." + i);
}

// If we add more reads after we used up our allowance they still won't process.
for (int i = 0; i < 2; i++) {
count++;
tasks.Add (diskManager.ReadAsync (fileData, new BlockInfo (0, 0, buffer.Length), buffer).AsTask ());
}
Assert.AreEqual (buffer.Length * count, diskManager.PendingReadBytes, "#4." + count);
while (count > 0) {
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingReadBytes, "#4");
while (tasks.Count > 0) {
await diskManager.Tick (1000).WithTimeout ();

for (int t = 0; t < 2; t++) {
Expand All @@ -265,8 +271,7 @@ public async Task ExceedReadRate ()
}
Assert.IsFalse (tasks.Any (t => t.IsCompleted));

count -= 2;
Assert.AreEqual (buffer.Length * count, diskManager.PendingReadBytes, "#5." + count);
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingReadBytes, "#5");
}
}

Expand All @@ -277,19 +282,26 @@ public async Task ExceedWriteRate ()
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskWriteRate = 1, DiskCacheBytes = 0 }.ToSettings ());
await diskManager.Tick (1000);

// Queue up 6 reads, none should process.
// Queue up 6 reads, 1 should process.
var buffer = new byte[Constants.BlockSize];
int count = 6;
var tasks = new List<Task> ();
for (int i = 0; i < count; i++)
for (int i = 0; i < 8; i++)
tasks.Add (diskManager.WriteAsync (fileData, new BlockInfo (i / 3, Constants.BlockSize * (i % 3), Constants.BlockSize), buffer).AsTask ());

Assert.AreEqual (buffer.Length * count, diskManager.PendingWriteBytes, "#1");
// Wait for the first task to complete.
var doneTask = await Task.WhenAny (tasks).WithTimeout ();
tasks.Remove (doneTask);
await doneTask;

// We should still process none.
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingWriteBytes, "#1");

// We should still process one.
await diskManager.Tick (1000);
doneTask = await Task.WhenAny (tasks).WithTimeout ();
tasks.Remove (doneTask);
await doneTask;

Assert.AreEqual (buffer.Length * count, diskManager.PendingWriteBytes, "#2");
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingWriteBytes, "#2");

// Give a proper max read rate.
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskWriteRate = Constants.BlockSize * 2, DiskCacheBytes = 0 }.ToSettings ());
Expand All @@ -303,16 +315,14 @@ public async Task ExceedWriteRate ()
}
Assert.IsFalse (tasks.Any (t => t.IsCompleted));

count -= 2;
Assert.AreEqual (buffer.Length * count, diskManager.PendingWriteBytes, "#3." + i);
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingWriteBytes, "#3." + i);
}

// If we add more writes after we used up our allowance they still won't process.
for (int i = 0; i < 2; i++) {
count++;
tasks.Add (diskManager.WriteAsync (fileData, new BlockInfo (0, Constants.BlockSize * i, Constants.BlockSize), buffer).AsTask ());
}
Assert.AreEqual (buffer.Length * count, diskManager.PendingWriteBytes, "#4");
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingWriteBytes, "#4");

while (diskManager.PendingWriteBytes > 0) {
await diskManager.Tick (1000);
Expand All @@ -323,8 +333,7 @@ public async Task ExceedWriteRate ()
}
Assert.IsFalse (tasks.Any (t => t.IsCompleted));

count -= 2;
Assert.AreEqual (buffer.Length * count, diskManager.PendingWriteBytes, "#5." + diskManager.PendingWriteBytes);
Assert.AreEqual (buffer.Length * tasks.Count, diskManager.PendingWriteBytes, "#5." + diskManager.PendingWriteBytes);
}
}

Expand Down Expand Up @@ -466,12 +475,13 @@ public async Task ReadPieceTwo ()
public async Task ReadRate ()
{
var buffer = new byte[Constants.BlockSize];
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskReadRate = Constants.BlockSize, DiskCacheBytes = 0 }.ToSettings ());
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskReadRate = 1, DiskCacheBytes = 0 }.ToSettings ());
await diskManager.Tick (1000);

var tasks = new List<Task> ();
for (int i = 0; i < SpeedMonitor.DefaultAveragePeriod + 1; i++)
tasks.Add (diskManager.ReadAsync (fileData, new BlockInfo (0, 0, Constants.BlockSize), buffer).AsTask ());

while (diskManager.PendingReadBytes > 0) {
await diskManager.Tick (1000);
var done = await Task.WhenAny (tasks).WithTimeout ();
Expand Down Expand Up @@ -694,7 +704,7 @@ public async Task WritePiece_ReverseOrder ()
public async Task WriteRate ()
{
var buffer = new byte[Constants.BlockSize];
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskWriteRate = Constants.BlockSize, DiskCacheBytes = 0 }.ToSettings ());
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { MaximumDiskWriteRate = 1, DiskCacheBytes = 0 }.ToSettings ());
await diskManager.Tick (1000);

var tasks = new List<Task> ();
Expand Down
Loading

0 comments on commit a8c7a6c

Please sign in to comment.