Skip to content

Commit a05d555

Browse files
author
John Luo
authored
Revert Work around potential race in PipeWriter (#10315)
* Remove Flaky attributes * Revert "Work around potential race in PipeWriter (#10165)" This reverts commit bff1d0e.
1 parent 76b7366 commit a05d555

File tree

6 files changed

+38
-242
lines changed

6 files changed

+38
-242
lines changed

src/ProjectTemplates/test/SpaTemplateTest/AngularTemplateTest.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ public AngularTemplateTest(ProjectFactoryFixture projectFactory, BrowserFixture
1717
: base(projectFactory, browserFixture, output) { }
1818

1919
[Fact]
20-
[Flaky("https://github.com/aspnet/AspNetCore-Internal/issues/2422", FlakyOn.All)]
2120
public Task AngularTemplate_Works()
2221
=> SpaTemplateImplAsync("angularnoauth", "angular", useLocalDb: false, usesAuth: false);
2322

src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,39 +51,35 @@ public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count)
5151

5252
public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
5353
{
54-
// https://github.com/dotnet/corefxlab/issues/1334
55-
// Pipelines don't support multiple awaiters on flush.
56-
lock (_flushLock)
57-
{
58-
if (_lastFlushTask != null && !_lastFlushTask.IsCompleted)
59-
{
60-
_lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken);
61-
return new ValueTask<FlushResult>(_lastFlushTask);
62-
}
63-
64-
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
65-
}
66-
}
67-
68-
private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
69-
{
70-
var pipeFlushTask = _writer.FlushAsync(cancellationToken);
54+
var flushValueTask = _writer.FlushAsync(cancellationToken);
7155

7256
if (minRate != null)
7357
{
7458
_timeoutControl.BytesWrittenToBuffer(minRate, count);
7559
}
7660

77-
if (pipeFlushTask.IsCompletedSuccessfully)
61+
if (flushValueTask.IsCompletedSuccessfully)
7862
{
79-
return new ValueTask<FlushResult>(pipeFlushTask.Result);
63+
return new ValueTask<FlushResult>(flushValueTask.Result);
8064
}
8165

82-
_lastFlushTask = TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken);
83-
return new ValueTask<FlushResult>(_lastFlushTask);
66+
// https://github.com/dotnet/corefxlab/issues/1334
67+
// Pipelines don't support multiple awaiters on flush.
68+
// While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes,
69+
// it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case,
70+
// we find previous flush Task which still accounts for any newly committed bytes and await that.
71+
lock (_flushLock)
72+
{
73+
if (_lastFlushTask == null || _lastFlushTask.IsCompleted)
74+
{
75+
_lastFlushTask = flushValueTask.AsTask();
76+
}
77+
78+
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
79+
}
8480
}
8581

86-
private async Task<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
82+
private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
8783
{
8884
if (minRate != null)
8985
{
@@ -92,7 +88,7 @@ private async Task<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pip
9288

9389
try
9490
{
95-
return await pipeFlushTask;
91+
return await _lastFlushTask;
9692
}
9793
catch (OperationCanceledException ex) when (outputAborter != null)
9894
{
@@ -115,11 +111,5 @@ private async Task<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pip
115111

116112
return default;
117113
}
118-
119-
private async Task<FlushResult> AwaitLastFlushAndTimeFlushAsync(Task lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
120-
{
121-
await lastFlushTask;
122-
return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
123-
}
124114
}
125115
}

src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs

Lines changed: 0 additions & 67 deletions
This file was deleted.

src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -553,15 +553,15 @@ await Task.Run(async () =>
553553
Assert.False(task1Waits.IsFaulted);
554554

555555
// following tasks should wait.
556-
var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
556+
var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
557557

558558
// Give time for tasks to percolate
559559
await _mockLibuv.OnPostTask;
560560

561-
// Second task is not completed
562-
Assert.False(task2Canceled.IsCompleted);
563-
Assert.False(task2Canceled.IsCanceled);
564-
Assert.False(task2Canceled.IsFaulted);
561+
// Third task is not completed
562+
Assert.False(task3Canceled.IsCompleted);
563+
Assert.False(task3Canceled.IsCanceled);
564+
Assert.False(task3Canceled.IsFaulted);
565565

566566
abortedSource.Cancel();
567567

@@ -571,29 +571,29 @@ await Task.Run(async () =>
571571
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
572572
}
573573

574-
// First task is completed
574+
// First task is completed
575575
Assert.True(task1Waits.IsCompleted);
576576
Assert.False(task1Waits.IsCanceled);
577577
Assert.False(task1Waits.IsFaulted);
578578

579-
// Second task is now canceled
580-
await Assert.ThrowsAsync<OperationCanceledException>(() => task2Canceled);
581-
Assert.True(task2Canceled.IsCanceled);
579+
// A final write guarantees that the error is observed by OutputProducer,
580+
// but doesn't return a canceled/faulted task.
581+
var task4Success = outputProducer.WriteDataAsync(fullBuffer);
582+
Assert.True(task4Success.IsCompleted);
583+
Assert.False(task4Success.IsCanceled);
584+
Assert.False(task4Success.IsFaulted);
582585

583-
// A final write can still succeed.
584-
var task3Success = outputProducer.WriteDataAsync(fullBuffer);
586+
// Third task is now canceled
587+
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
588+
Assert.True(task3Canceled.IsCanceled);
585589

586590
await _mockLibuv.OnPostTask;
587591

588-
// Complete the 3rd write
592+
// Complete the 4th write
589593
while (completeQueue.TryDequeue(out var triggerNextCompleted))
590594
{
591595
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
592596
}
593-
594-
Assert.True(task3Success.IsCompleted);
595-
Assert.False(task3Success.IsCanceled);
596-
Assert.False(task3Success.IsFaulted);;
597597
}
598598
});
599599
}

src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs

Lines changed: 0 additions & 118 deletions
This file was deleted.

src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public class Http1WritingBenchmark
2929
private TestHttp1Connection _http1Connection;
3030
private DuplexPipe.DuplexPipePair _pair;
3131
private MemoryPool<byte> _memoryPool;
32-
private Task _consumeResponseBodyTask;
3332

3433
private readonly byte[] _writeData = Encoding.ASCII.GetBytes("Hello, World!");
3534

@@ -38,7 +37,6 @@ public void GlobalSetup()
3837
{
3938
_memoryPool = KestrelMemoryPool.Create();
4039
_http1Connection = MakeHttp1Connection();
41-
_consumeResponseBodyTask = ConsumeResponseBody();
4240
}
4341

4442
[Params(true, false)]
@@ -127,18 +125,14 @@ private TestHttp1Connection MakeHttp1Connection()
127125
return http1Connection;
128126
}
129127

130-
private async Task ConsumeResponseBody()
128+
[IterationCleanup]
129+
public void Cleanup()
131130
{
132131
var reader = _pair.Application.Input;
133-
var readResult = await reader.ReadAsync();
134-
135-
while (!readResult.IsCompleted)
132+
if (reader.TryRead(out var readResult))
136133
{
137134
reader.AdvanceTo(readResult.Buffer.End);
138-
readResult = await reader.ReadAsync();
139135
}
140-
141-
reader.Complete();
142136
}
143137

144138
public enum Startup
@@ -151,8 +145,6 @@ public enum Startup
151145
[GlobalCleanup]
152146
public void Dispose()
153147
{
154-
_pair.Transport.Output.Complete();
155-
_consumeResponseBodyTask.GetAwaiter().GetResult();
156148
_memoryPool?.Dispose();
157149
}
158150
}

0 commit comments

Comments
 (0)