Skip to content

Commit bafd867

Browse files
authored
Rent buffers used in SFTP reads (#1738)
An SFTP download performs several reads from the server in parallel, allocating an array to store each result until it's ready to be consumed. Since these buffers are short-lived and normally of the same large-ish size (32KB), it seems like a good candidate for pooling.
1 parent 330e933 commit bafd867

File tree

11 files changed

+162
-33
lines changed

11 files changed

+162
-33
lines changed

src/Renci.SshNet/Common/Extensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,14 @@ async Task<T> WaitCore()
417417
return await completedTask.ConfigureAwait(false);
418418
}
419419
}
420+
421+
extension(Task t)
422+
{
423+
internal bool IsCompletedSuccessfully
424+
{
425+
get { return t.Status == TaskStatus.RanToCompletion; }
426+
}
427+
}
420428
#endif
421429
}
422430
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#nullable enable
2+
using System;
3+
using System.Buffers;
4+
using System.Diagnostics;
5+
using System.Net;
6+
7+
namespace Renci.SshNet.Common
8+
{
9+
/// <summary>
10+
/// A type representing ownership of a rented, read-only buffer.
11+
/// </summary>
12+
internal sealed class ReadOnlyMemoryOwner : IMemoryOwner<byte>
13+
{
14+
private ArrayBuffer _buffer;
15+
16+
public ReadOnlyMemoryOwner(ArrayBuffer buffer)
17+
{
18+
_buffer = buffer;
19+
20+
AssertValid();
21+
}
22+
23+
[Conditional("DEBUG")]
24+
private void AssertValid()
25+
{
26+
Debug.Assert(
27+
_buffer.ActiveLength > 0 || _buffer.AvailableLength == 0,
28+
"If the buffer is empty, then it should have been returned to the pool.");
29+
}
30+
31+
public int Length
32+
{
33+
get
34+
{
35+
AssertValid();
36+
return _buffer.ActiveLength;
37+
}
38+
}
39+
40+
public bool IsEmpty
41+
{
42+
get
43+
{
44+
AssertValid();
45+
return _buffer.ActiveLength == 0;
46+
}
47+
}
48+
49+
public ReadOnlySpan<byte> Span
50+
{
51+
get
52+
{
53+
AssertValid();
54+
return _buffer.ActiveReadOnlySpan;
55+
}
56+
}
57+
58+
Memory<byte> IMemoryOwner<byte>.Memory
59+
{
60+
get
61+
{
62+
AssertValid();
63+
return _buffer.ActiveMemory;
64+
}
65+
}
66+
67+
public void Slice(int start)
68+
{
69+
AssertValid();
70+
71+
_buffer.Discard(start);
72+
73+
if (_buffer.ActiveLength == 0)
74+
{
75+
// Return the rented buffer as soon as it's no longer in use.
76+
_buffer.ClearAndReturnBuffer();
77+
}
78+
}
79+
80+
public void Dispose()
81+
{
82+
AssertValid();
83+
84+
_buffer.ClearAndReturnBuffer();
85+
}
86+
}
87+
}

src/Renci.SshNet/Sftp/ISftpSession.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading;
44
using System.Threading.Tasks;
55

6+
using Renci.SshNet.Common;
67
using Renci.SshNet.Sftp.Responses;
78

89
namespace Renci.SshNet.Sftp
@@ -198,7 +199,7 @@ internal interface ISftpSession : ISubsystemSession
198199
/// its <see cref="Task{Task}.Result"/> contains the data read from the file, or an empty
199200
/// array when the end of the file is reached.
200201
/// </returns>
201-
Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);
202+
Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);
202203

203204
/// <summary>
204205
/// Performs a <c>SSH_FXP_READDIR</c> request.
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace Renci.SshNet.Sftp.Responses
1+
using System;
2+
3+
namespace Renci.SshNet.Sftp.Responses
24
{
35
internal sealed class SftpDataResponse : SftpResponse
46
{
@@ -7,7 +9,7 @@ public override SftpMessageTypes SftpMessageType
79
get { return SftpMessageTypes.Data; }
810
}
911

10-
public byte[] Data { get; set; }
12+
public ArraySegment<byte> Data { get; set; }
1113

1214
public SftpDataResponse(uint protocolVersion)
1315
: base(protocolVersion)
@@ -18,14 +20,14 @@ protected override void LoadData()
1820
{
1921
base.LoadData();
2022

21-
Data = ReadBinary();
23+
Data = ReadBinarySegment();
2224
}
2325

2426
protected override void SaveData()
2527
{
2628
base.SaveData();
2729

28-
WriteBinary(Data, 0, Data.Length);
30+
WriteBinary(Data.Array, Data.Offset, Data.Count);
2931
}
3032
}
3133
}

src/Renci.SshNet/Sftp/SftpFileReader.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
using System.Threading;
77
using System.Threading.Tasks;
88

9-
#if !NET
109
using Renci.SshNet.Common;
11-
#endif
1210

1311
namespace Renci.SshNet.Sftp
1412
{
@@ -58,7 +56,7 @@ public SftpFileReader(byte[] handle, ISftpSession sftpSession, int chunkSize, lo
5856
_cts = new CancellationTokenSource();
5957
}
6058

61-
public async Task<byte[]> ReadAsync(CancellationToken cancellationToken)
59+
public async Task<ReadOnlyMemoryOwner> ReadAsync(CancellationToken cancellationToken)
6260
{
6361
_exception?.Throw();
6462

@@ -172,14 +170,21 @@ public void Dispose()
172170

173171
if (_requests.Count > 0)
174172
{
175-
// Cancel outstanding requests and observe the exception on them
176-
// as an effort to prevent unhandled exceptions.
177-
178173
_cts.Cancel();
179174

180175
foreach (var request in _requests.Values)
181176
{
182-
_ = request.Task.Exception;
177+
// Return rented buffers to the pool, or observe exception on
178+
// the task as an effort to prevent unhandled exceptions.
179+
180+
if (request.Task.IsCompletedSuccessfully)
181+
{
182+
request.Task.GetAwaiter().GetResult().Dispose();
183+
}
184+
else
185+
{
186+
_ = request.Task.Exception;
187+
}
183188
}
184189

185190
_requests.Clear();
@@ -190,7 +195,7 @@ public void Dispose()
190195

191196
private sealed class Request
192197
{
193-
public Request(ulong offset, uint count, Task<byte[]> task)
198+
public Request(ulong offset, uint count, Task<ReadOnlyMemoryOwner> task)
194199
{
195200
Offset = offset;
196201
Count = count;
@@ -199,7 +204,7 @@ public Request(ulong offset, uint count, Task<byte[]> task)
199204

200205
public ulong Offset { get; }
201206
public uint Count { get; }
202-
public Task<byte[]> Task { get; }
207+
public Task<ReadOnlyMemoryOwner> Task { get; }
203208
}
204209
}
205210
}

src/Renci.SshNet/Sftp/SftpFileStream.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public sealed partial class SftpFileStream : Stream
2626
private readonly int _readBufferSize;
2727

2828
private SftpFileReader? _sftpFileReader;
29-
private ReadOnlyMemory<byte> _readBuffer;
29+
private ReadOnlyMemoryOwner _readBuffer;
3030
private System.Net.ArrayBuffer _writeBuffer;
3131

3232
private long _position;
@@ -153,6 +153,7 @@ private SftpFileStream(
153153
_readBufferSize = readBufferSize;
154154
_position = position;
155155
_writeBuffer = new System.Net.ArrayBuffer(writeBufferSize);
156+
_readBuffer = new ReadOnlyMemoryOwner(new System.Net.ArrayBuffer(0, usePool: true));
156157
_sftpFileReader = initialReader;
157158
}
158159

@@ -390,7 +391,7 @@ await _session.RequestWriteAsync(
390391

391392
private void InvalidateReads()
392393
{
393-
_readBuffer = ReadOnlyMemory<byte>.Empty;
394+
_readBuffer.Dispose();
394395
_sftpFileReader?.Dispose();
395396
_sftpFileReader = null;
396397
}
@@ -441,7 +442,7 @@ private int Read(Span<byte> buffer)
441442
var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
442443

443444
_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer);
444-
_readBuffer = _readBuffer.Slice(bytesRead);
445+
_readBuffer.Slice(bytesRead);
445446

446447
_position += bytesRead;
447448

@@ -494,8 +495,8 @@ private async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ca
494495

495496
var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
496497

497-
_readBuffer.Slice(0, bytesRead).CopyTo(buffer);
498-
_readBuffer = _readBuffer.Slice(bytesRead);
498+
_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer.Span);
499+
_readBuffer.Slice(bytesRead);
499500

500501
_position += bytesRead;
501502

@@ -649,7 +650,7 @@ public override long Seek(long offset, SeekOrigin origin)
649650

650651
if (readBufferStart <= newPosition && newPosition <= readBufferEnd)
651652
{
652-
_readBuffer = _readBuffer.Slice((int)(newPosition - readBufferStart));
653+
_readBuffer.Slice((int)(newPosition - readBufferStart));
653654
}
654655
else
655656
{

src/Renci.SshNet/Sftp/SftpSession.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.Diagnostics;
55
using System.Globalization;
6+
using System.Net;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -24,7 +25,7 @@ internal sealed class SftpSession : SubsystemSession, ISftpSession
2425
private readonly Dictionary<uint, SftpRequest> _requests = new Dictionary<uint, SftpRequest>();
2526
private readonly ISftpResponseFactory _sftpResponseFactory;
2627
private readonly Encoding _encoding;
27-
private System.Net.ArrayBuffer _buffer = new(32 * 1024);
28+
private ArrayBuffer _buffer = new(32 * 1024);
2829
private EventWaitHandle _sftpVersionConfirmed = new AutoResetEvent(initialState: false);
2930
private IDictionary<string, string> _supportedExtensions;
3031

@@ -495,7 +496,7 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
495496
length,
496497
response =>
497498
{
498-
data = response.Data;
499+
data = response.Data.ToArray();
499500
wait.SetIgnoringObjectDisposed();
500501
},
501502
response =>
@@ -526,28 +527,42 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
526527
}
527528

528529
/// <inheritdoc/>
529-
public Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
530+
public Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
530531
{
531532
Debug.Assert(length > 0, "This implementation cannot distinguish between EOF and zero-length reads");
532533

533534
if (cancellationToken.IsCancellationRequested)
534535
{
535-
return Task.FromCanceled<byte[]>(cancellationToken);
536+
return Task.FromCanceled<ReadOnlyMemoryOwner>(cancellationToken);
536537
}
537538

538-
var tcs = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
539+
var tcs = new TaskCompletionSource<ReadOnlyMemoryOwner>(TaskCreationOptions.RunContinuationsAsynchronously);
539540

540541
SendRequest(new SftpReadRequest(ProtocolVersion,
541542
NextRequestId,
542543
handle,
543544
offset,
544545
length,
545-
response => tcs.TrySetResult(response.Data),
546+
response =>
547+
{
548+
ArrayBuffer buffer = new(response.Data.Count, usePool: true);
549+
550+
response.Data.AsSpan().CopyTo(buffer.AvailableSpan);
551+
552+
buffer.Commit(response.Data.Count);
553+
554+
ReadOnlyMemoryOwner owner = new(buffer);
555+
556+
if (!tcs.TrySetResult(owner))
557+
{
558+
owner.Dispose();
559+
}
560+
},
546561
response =>
547562
{
548563
if (response.StatusCode == StatusCode.Eof)
549564
{
550-
_ = tcs.TrySetResult(Array.Empty<byte>());
565+
_ = tcs.TrySetResult(new(new(0, usePool: true)));
551566
}
552567
else
553568
{

test/Renci.SshNet.Tests/Classes/Sftp/Responses/SftpDataResponseTest.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Constructor()
3232
{
3333
var target = new SftpDataResponse(_protocolVersion);
3434

35-
Assert.IsNull(target.Data);
35+
Assert.AreEqual(default, target.Data);
3636
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
3737
Assert.AreEqual((uint)0, target.ResponseId);
3838
Assert.AreEqual(SftpMessageTypes.Data, target.SftpMessageType);
@@ -52,7 +52,6 @@ public void Load()
5252

5353
target.Load(sshData);
5454

55-
Assert.IsNotNull(target.Data);
5655
Assert.IsTrue(target.Data.SequenceEqual(_data));
5756
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
5857
Assert.AreEqual(_responseId, target.ResponseId);

test/Renci.SshNet.Tests/Classes/Sftp/SftpDataResponseBuilder.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Renci.SshNet.Sftp.Responses;
1+
using System;
2+
3+
using Renci.SshNet.Sftp.Responses;
24

35
namespace Renci.SshNet.Tests.Classes.Sftp
46
{
@@ -31,7 +33,7 @@ public SftpDataResponse Build()
3133
return new SftpDataResponse(_protocolVersion)
3234
{
3335
ResponseId = _responseId,
34-
Data = _data
36+
Data = new ArraySegment<byte>(_data)
3537
};
3638
}
3739
}

0 commit comments

Comments
 (0)