Skip to content

Commit 7966f25

Browse files
authored
Dispatch connection execution (#12265)
- Dispatch connection initialization to the thread pool to avoid executing the first read on the IO thread (longer explanation in the bug).
1 parent 8ce68d0 commit 7966f25

File tree

4 files changed

+88
-70
lines changed

4 files changed

+88
-70
lines changed

src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ async Task AcceptConnectionsAsync()
5151
break;
5252
}
5353

54-
_ = Execute(new KestrelConnection(connection, _serviceContext.Log));
54+
var id = Interlocked.Increment(ref _lastConnectionId);
55+
var kestrelConnection = new KestrelConnection(id, _serviceContext, _connectionDelegate, connection, _serviceContext.Log);
56+
ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
5557
}
5658
}
5759
catch (Exception ex)
@@ -65,55 +67,5 @@ async Task AcceptConnectionsAsync()
6567
}
6668
}
6769
}
68-
69-
internal async Task Execute(KestrelConnection connection)
70-
{
71-
var id = Interlocked.Increment(ref _lastConnectionId);
72-
var connectionContext = connection.TransportConnection;
73-
74-
try
75-
{
76-
_serviceContext.ConnectionManager.AddConnection(id, connection);
77-
78-
Log.ConnectionStart(connectionContext.ConnectionId);
79-
KestrelEventSource.Log.ConnectionStart(connectionContext);
80-
81-
using (BeginConnectionScope(connectionContext))
82-
{
83-
try
84-
{
85-
await _connectionDelegate(connectionContext);
86-
}
87-
catch (Exception ex)
88-
{
89-
Log.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
90-
}
91-
}
92-
}
93-
finally
94-
{
95-
await connection.FireOnCompletedAsync();
96-
97-
Log.ConnectionStop(connectionContext.ConnectionId);
98-
KestrelEventSource.Log.ConnectionStop(connectionContext);
99-
100-
// Dispose the transport connection, this needs to happen before removing it from the
101-
// connection manager so that we only signal completion of this connection after the transport
102-
// is properly torn down.
103-
await connection.TransportConnection.DisposeAsync();
104-
105-
_serviceContext.ConnectionManager.RemoveConnection(id);
106-
}
107-
}
108-
109-
private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
110-
{
111-
if (Log.IsEnabled(LogLevel.Critical))
112-
{
113-
return Log.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
114-
}
115-
116-
return null;
117-
}
11870
}
11971
}

src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelConnection.cs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
1313
{
14-
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature
14+
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature, IThreadPoolWorkItem
1515
{
1616
private List<(Action<object> handler, object state)> _heartbeatHandlers;
1717
private readonly object _heartbeatLock = new object();
@@ -21,9 +21,19 @@ internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompl
2121

2222
private readonly CancellationTokenSource _connectionClosingCts = new CancellationTokenSource();
2323
private readonly TaskCompletionSource<object> _completionTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
24-
25-
public KestrelConnection(ConnectionContext connectionContext, ILogger logger)
24+
private readonly long _id;
25+
private readonly ServiceContext _serviceContext;
26+
private readonly ConnectionDelegate _connectionDelegate;
27+
28+
public KestrelConnection(long id,
29+
ServiceContext serviceContext,
30+
ConnectionDelegate connectionDelegate,
31+
ConnectionContext connectionContext,
32+
IKestrelTrace logger)
2633
{
34+
_id = id;
35+
_serviceContext = serviceContext;
36+
_connectionDelegate = connectionDelegate;
2737
Logger = logger;
2838
TransportConnection = connectionContext;
2939

@@ -33,7 +43,7 @@ public KestrelConnection(ConnectionContext connectionContext, ILogger logger)
3343
ConnectionClosedRequested = _connectionClosingCts.Token;
3444
}
3545

36-
private ILogger Logger { get; }
46+
private IKestrelTrace Logger { get; }
3747

3848
public ConnectionContext TransportConnection { get; set; }
3949
public CancellationToken ConnectionClosedRequested { get; set; }
@@ -164,5 +174,59 @@ public void Complete()
164174

165175
_connectionClosingCts.Dispose();
166176
}
177+
178+
void IThreadPoolWorkItem.Execute()
179+
{
180+
_ = ExecuteAsync();
181+
}
182+
183+
internal async Task ExecuteAsync()
184+
{
185+
var connectionContext = TransportConnection;
186+
187+
try
188+
{
189+
_serviceContext.ConnectionManager.AddConnection(_id, this);
190+
191+
Logger.ConnectionStart(connectionContext.ConnectionId);
192+
KestrelEventSource.Log.ConnectionStart(connectionContext);
193+
194+
using (BeginConnectionScope(connectionContext))
195+
{
196+
try
197+
{
198+
await _connectionDelegate(connectionContext);
199+
}
200+
catch (Exception ex)
201+
{
202+
Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
203+
}
204+
}
205+
}
206+
finally
207+
{
208+
await FireOnCompletedAsync();
209+
210+
Logger.ConnectionStop(connectionContext.ConnectionId);
211+
KestrelEventSource.Log.ConnectionStop(connectionContext);
212+
213+
// Dispose the transport connection, this needs to happen before removing it from the
214+
// connection manager so that we only signal completion of this connection after the transport
215+
// is properly torn down.
216+
await TransportConnection.DisposeAsync();
217+
218+
_serviceContext.ConnectionManager.RemoveConnection(_id);
219+
}
220+
}
221+
222+
private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
223+
{
224+
if (Logger.IsEnabled(LogLevel.Critical))
225+
{
226+
return Logger.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
227+
}
228+
229+
return null;
230+
}
167231
}
168232
}

src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
2121
public class ConnectionDispatcherTests
2222
{
2323
[Fact]
24-
public void OnConnectionCreatesLogScopeWithConnectionId()
24+
public async Task OnConnectionCreatesLogScopeWithConnectionId()
2525
{
2626
var serviceContext = new TestServiceContext();
2727
// This needs to run inline
2828
var tcs = new TaskCompletionSource<object>();
29-
var dispatcher = new ConnectionDispatcher(serviceContext, _ => tcs.Task);
3029

3130
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
3231
connection.ConnectionClosed = new CancellationToken(canceled: true);
32+
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => tcs.Task, connection, serviceContext.Log);
3333

34-
_ = dispatcher.Execute(new KestrelConnection(connection, Mock.Of<ILogger>()));
34+
var task = kestrelConnection.ExecuteAsync();
3535

3636
// The scope should be created
3737
var scopeObjects = ((TestKestrelTrace)serviceContext.Log)
@@ -47,6 +47,8 @@ public void OnConnectionCreatesLogScopeWithConnectionId()
4747

4848
tcs.TrySetResult(null);
4949

50+
await task;
51+
5052
// Verify the scope was disposed after request processing completed
5153
Assert.True(((TestKestrelTrace)serviceContext.Log).Logger.Scopes.IsEmpty);
5254
}
@@ -73,19 +75,18 @@ public async Task StartAcceptingConnectionsAsyncLogsIfAcceptAsyncThrows()
7375
public async Task OnConnectionFiresOnCompleted()
7476
{
7577
var serviceContext = new TestServiceContext();
76-
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);
7778

7879
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
7980
connection.ConnectionClosed = new CancellationToken(canceled: true);
80-
var kestrelConnection = new KestrelConnection(connection, Mock.Of<ILogger>());
81+
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
8182
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();
8283

8384
Assert.NotNull(completeFeature);
8485
object stateObject = new object();
8586
object callbackState = null;
8687
completeFeature.OnCompleted(state => { callbackState = state; return Task.CompletedTask; }, stateObject);
8788

88-
await dispatcher.Execute(kestrelConnection);
89+
await kestrelConnection.ExecuteAsync();
8990

9091
Assert.Equal(stateObject, callbackState);
9192
}
@@ -94,25 +95,23 @@ public async Task OnConnectionFiresOnCompleted()
9495
public async Task OnConnectionOnCompletedExceptionCaught()
9596
{
9697
var serviceContext = new TestServiceContext();
97-
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);
98-
98+
var logger = ((TestKestrelTrace)serviceContext.Log).Logger;
9999
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
100100
connection.ConnectionClosed = new CancellationToken(canceled: true);
101-
var mockLogger = new Mock<ILogger>();
102-
var kestrelConnection = new KestrelConnection(connection, mockLogger.Object);
101+
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
103102
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();
104103

105104
Assert.NotNull(completeFeature);
106105
object stateObject = new object();
107106
object callbackState = null;
108107
completeFeature.OnCompleted(state => { callbackState = state; throw new InvalidTimeZoneException(); }, stateObject);
109108

110-
await dispatcher.Execute(kestrelConnection);
109+
await kestrelConnection.ExecuteAsync();
111110

112111
Assert.Equal(stateObject, callbackState);
113-
var log = mockLogger.Invocations.First();
114-
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", log.Arguments[2].ToString());
115-
Assert.IsType<InvalidTimeZoneException>(log.Arguments[3]);
112+
var errors = logger.Messages.Where(e => e.LogLevel >= LogLevel.Error).ToArray();
113+
Assert.Single(errors);
114+
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", errors[0].Message);
116115
}
117116

118117
private class ThrowingListener : IConnectionListener

src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
using System;
55
using System.Runtime.CompilerServices;
6+
using System.Threading.Tasks;
67
using Microsoft.AspNetCore.Connections;
78
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
9+
using Microsoft.AspNetCore.Testing;
810
using Microsoft.Extensions.Logging;
911
using Moq;
1012
using Xunit;
@@ -39,9 +41,10 @@ private void UnrootedConnectionsGetRemovedFromHeartbeatInnerScope(
3941
ConnectionManager httpConnectionManager,
4042
Mock<IKestrelTrace> trace)
4143
{
44+
var serviceContext = new TestServiceContext();
4245
var mock = new Mock<DefaultConnectionContext>() { CallBase = true };
4346
mock.Setup(m => m.ConnectionId).Returns(connectionId);
44-
var httpConnection = new KestrelConnection(mock.Object, Mock.Of<ILogger>());
47+
var httpConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, mock.Object, Mock.Of<IKestrelTrace>());
4548

4649
httpConnectionManager.AddConnection(0, httpConnection);
4750

0 commit comments

Comments
 (0)