From ec3c8ebeba31ef71930f7c5a102df41171110b52 Mon Sep 17 00:00:00 2001 From: Alan McGovern Date: Sat, 5 Dec 2020 12:45:27 +0000 Subject: [PATCH] [core] Fix a threading issue in FlushAsync We should ensure we're executing on the IO thread when we're interacting with the filestreams. The 'Tick' method is invoked from the main engine's tick so it'll be on the wrong thread unless we swap here. Also, ensure we take an async lock on the filestream we intend to flush. This ensures there's multi-threaded IO, but each stream is accessed in a single-threaded way. Fixes https://github.com/alanmcgovern/monotorrent/issues/345 --- .../Client/DiskWriterTests.cs | 11 ++++++++-- .../MonoTorrent.Client/FileStreamBuffer.cs | 6 ++++++ .../MonoTorrent.Client/MainLoop.cs | 10 +++++++++- .../Managers/DiskManager.cs | 20 +++++++++---------- .../Managers/ITorrentFileInfo.cs | 11 ++++++++++ 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/MonoTorrent.Tests/Client/DiskWriterTests.cs b/src/MonoTorrent.Tests/Client/DiskWriterTests.cs index 909046a99..5d71b1647 100644 --- a/src/MonoTorrent.Tests/Client/DiskWriterTests.cs +++ b/src/MonoTorrent.Tests/Client/DiskWriterTests.cs @@ -66,6 +66,8 @@ public void Teardown () public async Task CloseFileAsync_Opened () { using var writer = new DiskWriter (); + using var locker = await TorrentFile.Locker.EnterAsync (); + await writer.WriteAsync (TorrentFile, 0, new byte[10], 0, 10); Assert.IsTrue (File.Exists (TorrentFile.FullPath)); @@ -74,10 +76,11 @@ public async Task CloseFileAsync_Opened () } [Test] - public void CloseFileAsync_Unopened() + public async Task CloseFileAsync_Unopened() { using var writer = new DiskWriter (); - Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile)); + using (await TorrentFile.Locker.EnterAsync ()) + Assert.DoesNotThrowAsync (async () => await writer.CloseAsync (TorrentFile)); } [Test] @@ -94,11 +97,15 @@ public async Task ExceedMaxOpenFiles () }; using var writer = new DiskWriter (creator, 1); + using var locker = await TorrentFile.Locker.EnterAsync (); + var writeTask = writer.WriteAsync (TorrentFile, 0, new byte[100], 0, 100); await streamCreated.Task.WithTimeout (); // There's a limit of 1 concurrent read/write. var secondStreamWaiter = streamCreated.Task.AsTask (); + + using var secondLocker = await Others.First ().Locker.EnterAsync (); var secondStream = writer.WriteAsync (Others.First (), 0, new byte[100], 0, 100); Assert.ThrowsAsync (() => secondStreamWaiter.WithTimeout (100)); diff --git a/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs b/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs index 5f8735b98..16ad5e5b6 100644 --- a/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs +++ b/src/MonoTorrent/MonoTorrent.Client/FileStreamBuffer.cs @@ -96,6 +96,8 @@ internal async ReusableTask FlushAsync (ITorrentFileInfo file) internal RentedStream GetStream (ITorrentFileInfo file) { + file.ThrowIfNotLocked (); + if (Streams.TryGetValue (file, out ITorrentFileStream stream)) return new RentedStream (stream); return new RentedStream (null); @@ -103,6 +105,8 @@ internal RentedStream GetStream (ITorrentFileInfo file) internal async ReusableTask GetStreamAsync (ITorrentFileInfo file, FileAccess access) { + file.ThrowIfNotLocked (); + if (!Streams.TryGetValue (file, out ITorrentFileStream s)) s = null; @@ -160,6 +164,8 @@ void Add (ITorrentFileInfo file, ITorrentFileStream stream) void CloseAndRemove (ITorrentFileInfo file, ITorrentFileStream s) { + file.ThrowIfNotLocked (); + logger.InfoFormatted ("Closing and removing: {0}", file.Path); Streams.Remove (file); UsageOrder.Remove (file); diff --git a/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs b/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs index 3f7a8cb9e..5c7bef8ca 100644 --- a/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs +++ b/src/MonoTorrent/MonoTorrent.Client/MainLoop.cs @@ -30,6 +30,7 @@ using System; using System.Collections.Generic; using System.ComponentModel; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -203,6 +204,13 @@ public static ThreadSwitcher SwitchThread () return new ThreadSwitcher (); } -#endregion + [Conditional("DEBUG")] + internal void CheckThread () + { + if (Thread.CurrentThread != thread) + throw new InvalidOperationException ($"Missing context switch to the {thread.Name} MainLoop."); + } + + #endregion } } \ No newline at end of file diff --git a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs index 7ed99ed7c..7b8d03adc 100644 --- a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs +++ b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs @@ -324,7 +324,8 @@ public async Task FlushAsync (ITorrentData manager, int pieceIndex) await WaitForPendingWrites (); foreach (var file in manager.Files) { if (pieceIndex == -1 || (pieceIndex >= file.StartPieceIndex && pieceIndex <= file.EndPieceIndex)) - await Writer.FlushAsync (file); + using (await file.Locker.EnterAsync ()) + await Writer.FlushAsync (file); } } @@ -432,6 +433,8 @@ internal async ReusableTask WriteAsync (ITorrentData manager, long offset, byte[ async ReusableTask ProcessBufferedIOAsync (bool force = false) { + await IOLoop; + BufferedIO io; while (WriteQueue.Count > 0) { @@ -498,6 +501,8 @@ public static int FindFileIndex (IList files, long offset, int async ReusableTask DoReadAsync (ITorrentData manager, long offset, byte[] buffer, int count) { + IOLoop.CheckThread (); + ReadMonitor.AddDelta (count); if (count < 1) @@ -531,12 +536,8 @@ async ReusableTask DoReadAsync (ITorrentData manager, long offset, byte[] async ReusableTask ReadFromFileAsync (ITorrentFileInfo torrentFile, long offset, byte[] buffer, int bufferOffset, int count) { - await torrentFile.Locker.WaitAsync (); - try { + using (await torrentFile.Locker.EnterAsync ()) return await Writer.ReadAsync (torrentFile, offset, buffer, bufferOffset, count); - } finally { - torrentFile.Locker.Release (); - } } /// @@ -560,10 +561,7 @@ internal void Tick () /// The amount of time, in milliseconds, which has passed /// internal async ReusableTask Tick (int delta) - { - await IOLoop; - await Tick (delta, true); - } + => await Tick (delta, true); ReusableTask Tick (int delta, bool waitForBufferedIO) { @@ -581,6 +579,8 @@ ReusableTask Tick (int delta, bool waitForBufferedIO) async ReusableTask DoWriteAsync (ITorrentData manager, long offset, byte[] buffer, int count) { + IOLoop.CheckThread (); + WriteMonitor.AddDelta (count); if (offset < 0 || offset + count > manager.Size) diff --git a/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs b/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs index aa743136c..7beea5449 100644 --- a/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs +++ b/src/MonoTorrent/MonoTorrent.Client/Managers/ITorrentFileInfo.cs @@ -27,6 +27,7 @@ // +using System.Diagnostics; using System.Threading; namespace MonoTorrent.Client @@ -34,9 +35,12 @@ namespace MonoTorrent.Client public interface ITorrentFileInfo : ITorrentFile { + // FIXME: make BitField readonly. BitField BitField { get; } string FullPath { get; } Priority Priority { get; set; } + + // FIXME: Make this internal. SemaphoreSlim Locker { get; } (int startPiece, int endPiece) GetSelector (); @@ -46,5 +50,12 @@ public static class ITorrentFileInfoExtensions { public static long BytesDownloaded (this ITorrentFileInfo info) => (long) (info.BitField.PercentComplete * info.Length / 100.0); + + [Conditional ("DEBUG")] + internal static void ThrowIfNotLocked(this ITorrentFileInfo info) + { + if (info.Locker.CurrentCount > 0) + throw new System.InvalidOperationException ("File should have been locked before it was accessed"); + } } }