Skip to content

Commit a21b9a2

Browse files
github-actions[bot]BrennanConroycarlossanlop
authored
[release/6.0] Fix cancel in PipeReader.ReadAtLeastAsync (#66870)
* Fix cancel in PipeReader.ReadAtLeastAsync * Add missing GeneratePackageOnBuild and bump version Co-authored-by: Brennan <[email protected]> Co-authored-by: Carlos Sanchez <[email protected]>
1 parent 49c4a15 commit a21b9a2

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ Commonly Used Types:
99
System.IO.Pipelines.Pipe
1010
System.IO.Pipelines.PipeWriter
1111
System.IO.Pipelines.PipeReader</PackageDescription>
12-
<ServicingVersion>2</ServicingVersion>
12+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
13+
<ServicingVersion>3</ServicingVersion>
1314
</PropertyGroup>
1415
<ItemGroup>
1516
<Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs"

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs

+6-3
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,15 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
347347
ValueTask<FlushResult> result;
348348
lock (SyncObj)
349349
{
350-
PrepareFlush(out completionData, out result, cancellationToken);
350+
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
351351
}
352352

353353
TrySchedule(ReaderScheduler, completionData);
354354

355355
return result;
356356
}
357357

358-
private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
358+
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
359359
{
360360
var completeReader = CommitUnsynchronized();
361361

@@ -691,6 +691,9 @@ internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationTo
691691

692692
// We also need to flip the reading state off
693693
_operationState.EndRead();
694+
695+
// Begin read again to wire up cancellation token
696+
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
694697
}
695698

696699
// If the writer is currently paused and we are about the wait for more data then this would deadlock.
@@ -1057,7 +1060,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
10571060
WriteMultiSegment(source.Span);
10581061
}
10591062

1060-
PrepareFlush(out completionData, out result, cancellationToken);
1063+
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
10611064
}
10621065

10631066
TrySchedule(ReaderScheduler, completionData);

src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs

+31
Original file line numberDiff line numberDiff line change
@@ -162,5 +162,36 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
162162
Assert.True(result.IsCanceled);
163163
PipeReader.AdvanceTo(buffer.End);
164164
}
165+
166+
[Fact]
167+
public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
168+
{
169+
CancellationTokenSource cts = new CancellationTokenSource();
170+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token);
171+
cts.Cancel();
172+
return Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
173+
}
174+
175+
[Fact]
176+
public async Task ReadAtLeastAsyncCancelableAfterReadingSome()
177+
{
178+
CancellationTokenSource cts = new CancellationTokenSource();
179+
await Pipe.WriteAsync(new byte[10], default);
180+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token);
181+
cts.Cancel();
182+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
183+
}
184+
185+
[Fact]
186+
public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead()
187+
{
188+
CancellationTokenSource cts = new CancellationTokenSource();
189+
await Pipe.WriteAsync(new byte[10], default);
190+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(12, cts.Token);
191+
// Write, but not enough to unblock ReadAtLeastAsync
192+
await Pipe.WriteAsync(new byte[1], default);
193+
cts.Cancel();
194+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task);
195+
}
165196
}
166197
}

0 commit comments

Comments
 (0)