4
4
using System ;
5
5
using System . Collections . Concurrent ;
6
6
using System . Data ;
7
+ using System . Diagnostics ;
7
8
using System . Diagnostics . CodeAnalysis ;
8
9
using System . Linq ;
9
10
using System . Runtime . CompilerServices ;
@@ -46,8 +47,8 @@ public void ResetQueue(string queueName, string urlPrefix)
46
47
var key = new QueueKey ( queueName , urlPrefix ) ;
47
48
if ( _queues . TryGetValue ( key , out var queueWeakRef ) && queueWeakRef . TryGetTarget ( out var queue ) )
48
49
{
49
- queue . Detach ( ) ;
50
- Log . QueueReset ( _logger , queueName , urlPrefix ) ;
50
+ var detachedQueueState = queue . Detach ( ) ;
51
+ Log . QueueReset ( _logger , queueName , urlPrefix , detachedQueueState ) ;
51
52
}
52
53
}
53
54
}
@@ -74,7 +75,7 @@ public void DelegateRequest(HttpContext context, DestinationState destination)
74
75
return ;
75
76
}
76
77
77
- Delegate ( context , destination , _serverDelegationFeature , requestDelegationFeature , queue , _logger , reattachIfNeeded : true ) ;
78
+ Delegate ( context , destination , _serverDelegationFeature , requestDelegationFeature , queue , _logger , shouldRetry : true ) ;
78
79
79
80
static void Delegate (
80
81
HttpContext context ,
@@ -83,38 +84,46 @@ static void Delegate(
83
84
IHttpSysRequestDelegationFeature requestDelegationFeature ,
84
85
DelegationQueue queue ,
85
86
ILogger logger ,
86
- bool reattachIfNeeded )
87
+ bool shouldRetry )
87
88
{
88
89
// Opportunistically retry initialization if it failed previously.
89
90
// This helps when the target queue wasn't yet created because
90
91
// the target process hadn't yet started up.
91
92
var queueState = queue . Initialize ( serverDelegationFeature ) ;
92
93
if ( ! queueState . IsInitialized )
93
94
{
94
- Log . QueueNotInitialized ( logger , destination , queueState . InitializationException ) ;
95
+ Log . QueueNotInitialized ( logger , destination , queueState , queueState . InitializationException ) ;
95
96
context . Response . StatusCode = StatusCodes . Status503ServiceUnavailable ;
96
97
context . Features . Set < IForwarderErrorFeature > ( new ForwarderErrorFeature ( ForwarderError . NoAvailableDestinations , queueState . InitializationException ) ) ;
97
98
return ;
98
99
}
99
100
100
101
try
101
102
{
102
- Log . DelegatingRequest ( logger , destination ) ;
103
+ Log . DelegatingRequest ( logger , destination , queueState ) ;
103
104
requestDelegationFeature . DelegateRequest ( queueState . Rule ) ;
104
105
}
105
- catch ( HttpSysException ex ) when ( reattachIfNeeded && ex . ErrorCode == ERROR_OBJECT_NO_LONGER_EXISTS )
106
+ catch ( ObjectDisposedException ) when ( shouldRetry )
106
107
{
107
- Log . QueueNoLongerExists ( logger , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , ex ) ;
108
+ Log . QueueDisposed ( logger , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address ) ;
109
+
110
+ // Another thread detached/disposed the queue
111
+ // Attempt to delegate one more time which will to try re-initialize the queue
112
+ Delegate ( context , destination , serverDelegationFeature , requestDelegationFeature , queue , logger , shouldRetry : false ) ;
113
+ }
114
+ catch ( HttpSysException ex ) when ( shouldRetry && ex . ErrorCode == ERROR_OBJECT_NO_LONGER_EXISTS )
115
+ {
116
+ Log . QueueNoLongerExists ( logger , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , queueState , ex ) ;
108
117
109
118
// The target queue is gone. Detach from it so that we can try to re-attach.
110
119
queue . Detach ( queueState ) ;
111
120
112
121
// Attempt to delegate one more time which will try re-initialize the queue
113
- Delegate ( context , destination , serverDelegationFeature , requestDelegationFeature , queue , logger , reattachIfNeeded : false ) ;
122
+ Delegate ( context , destination , serverDelegationFeature , requestDelegationFeature , queue , logger , shouldRetry : false ) ;
114
123
}
115
124
catch ( Exception ex )
116
125
{
117
- Log . DelegationFailed ( logger , destination , ex ) ;
126
+ Log . DelegationFailed ( logger , destination , queueState , ex ) ;
118
127
context . Response . StatusCode = StatusCodes . Status503ServiceUnavailable ;
119
128
context . Features . Set < IForwarderErrorFeature > ( new ForwarderErrorFeature ( ForwarderError . Request , ex ) ) ;
120
129
}
@@ -253,7 +262,7 @@ public DelegationQueueState Initialize(IServerDelegationFeature delegationFeatur
253
262
return state ;
254
263
}
255
264
256
- public void Detach ( DelegationQueueState ? state = null )
265
+ public DelegationQueueState ? Detach ( DelegationQueueState ? state = null )
257
266
{
258
267
if ( state == null || state == _currentState )
259
268
{
@@ -262,10 +271,16 @@ public void Detach(DelegationQueueState? state = null)
262
271
if ( state == null || state == _currentState )
263
272
{
264
273
_currentState . Rule ? . Dispose ( ) ;
274
+
275
+ var oldState = _currentState ;
265
276
_currentState = new DelegationQueueState ( ) ;
277
+
278
+ return oldState ;
266
279
}
267
280
}
268
281
}
282
+
283
+ return null ;
269
284
}
270
285
271
286
public bool Equals ( QueueKey queueKey )
@@ -309,6 +324,13 @@ public DelegationQueueState(Exception ex)
309
324
public DelegationRule ? Rule { get ; }
310
325
311
326
public Exception ? InitializationException { get ; }
327
+
328
+ public string Id { get ; } = Activity . Current switch
329
+ {
330
+ { IdFormat : ActivityIdFormat . W3C } => Activity . Current . SpanId . ToHexString ( ) ,
331
+ { Id : not null } => Activity . Current . Id ,
332
+ _ => ActivitySpanId . CreateRandom ( ) . ToHexString ( ) ,
333
+ } ;
312
334
}
313
335
314
336
private readonly struct QueueKey : IEquatable < QueueKey >
@@ -356,30 +378,35 @@ private static class Log
356
378
EventIds . DelegationQueueNotFound ,
357
379
"Failed to get delegation queue for destination '{destinationId}' with queue name '{queueName}' and url prefix '{urlPrefix}'" ) ;
358
380
359
- private static readonly Action < ILogger , string , string ? , string ? , Exception ? > _queueNotInitialized = LoggerMessage . Define < string , string ? , string ? > (
381
+ private static readonly Action < ILogger , string , string ? , string ? , string , Exception ? > _queueNotInitialized = LoggerMessage . Define < string , string ? , string ? , string > (
360
382
LogLevel . Information ,
361
383
EventIds . DelegationQueueNotInitialized ,
362
- "Delegation queue not initialized for destination '{destinationId}' with queue '{queueName}' and url prefix '{urlPrefix}'." ) ;
384
+ "Delegation queue not initialized for destination '{destinationId}' with queue '{queueName}' and url prefix '{urlPrefix}'. Current state id '{stateId}' " ) ;
363
385
364
- private static readonly Action < ILogger , string ? , string ? , Exception ? > _queueReset = LoggerMessage . Define < string ? , string ? > (
386
+ private static readonly Action < ILogger , string ? , string ? , string ? , Exception ? > _queueReset = LoggerMessage . Define < string ? , string ? , string ? > (
365
387
LogLevel . Information ,
366
388
EventIds . DelegationQueueReset ,
367
- "Detached from queue with name '{queueName}' and url prefix '{urlPrefix}'" ) ;
389
+ "Detached from queue with name '{queueName}' and url prefix '{urlPrefix}'. Detached queue state id '{stateId}' " ) ;
368
390
369
- private static readonly Action < ILogger , string ? , string ? , Exception ? > _queueNoLongerExists = LoggerMessage . Define < string ? , string ? > (
370
- LogLevel . Information ,
391
+ private static readonly Action < ILogger , string ? , string ? , string , Exception ? > _queueNoLongerExists = LoggerMessage . Define < string ? , string ? , string > (
392
+ LogLevel . Debug ,
371
393
EventIds . DelegationQueueNoLongerExists ,
372
- "Destination queue with name '{queueName}' and url prefix '{urlPrefix}' no longer exists. Detaching and attempting to re-initialize." ) ;
394
+ "Destination queue with name '{queueName}' and url prefix '{urlPrefix}' no longer exists. Detaching and attempting to re-initialize. Current state id '{stateId}'" ) ;
395
+
396
+ private static readonly Action < ILogger , string ? , string ? , Exception ? > _queueDisposed = LoggerMessage . Define < string ? , string ? > (
397
+ LogLevel . Debug ,
398
+ EventIds . DelegationQueueDisposed ,
399
+ "Destination queue with name '{queueName}' and url prefix '{urlPrefix}' was disposed. Attempting to re-initialize." ) ;
373
400
374
- private static readonly Action < ILogger , string , string ? , string ? , Exception ? > _delegatingRequest = LoggerMessage . Define < string , string ? , string ? > (
401
+ private static readonly Action < ILogger , string , string ? , string ? , string , Exception ? > _delegatingRequest = LoggerMessage . Define < string , string ? , string ? , string > (
375
402
LogLevel . Information ,
376
403
EventIds . DelegatingRequest ,
377
- "Delegating to destination '{destinationId}' with queue '{queueName}' and url prefix '{urlPrefix}'" ) ;
404
+ "Delegating to destination '{destinationId}' with queue '{queueName}' and url prefix '{urlPrefix}'. Current state id '{stateId}' " ) ;
378
405
379
- private static readonly Action < ILogger , string , string ? , string ? , Exception ? > _delegationFailed = LoggerMessage . Define < string , string ? , string ? > (
406
+ private static readonly Action < ILogger , string , string ? , string ? , string , Exception ? > _delegationFailed = LoggerMessage . Define < string , string ? , string ? , string > (
380
407
LogLevel . Error ,
381
408
EventIds . DelegationFailed ,
382
- "Failed to delegate request for destination '{destinationId}' with queue name '{queueName}' and url prefix '{urlPrefix}'" ) ;
409
+ "Failed to delegate request for destination '{destinationId}' with queue name '{queueName}' and url prefix '{urlPrefix}'. Current state id '{stateId}' " ) ;
383
410
384
411
public static void QueueInitFailed ( ILogger logger , string destinationId , string queueName , string urlPrefix , Exception ? ex )
385
412
{
@@ -391,29 +418,34 @@ public static void QueueNotFound(ILogger logger, DestinationState destination)
391
418
_queueNotFound ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , null ) ;
392
419
}
393
420
394
- public static void QueueNotInitialized ( ILogger logger , DestinationState destination , Exception ? ex )
421
+ public static void QueueNotInitialized ( ILogger logger , DestinationState destination , DelegationQueueState queueState , Exception ? ex )
422
+ {
423
+ _queueNotInitialized ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , queueState . Id , ex ) ;
424
+ }
425
+
426
+ public static void QueueReset ( ILogger logger , string queueName , string urlPrefix , DelegationQueueState ? detachedQueueState )
395
427
{
396
- _queueNotInitialized ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , ex ) ;
428
+ _queueReset ( logger , queueName , urlPrefix , detachedQueueState ? . Id , null ) ;
397
429
}
398
430
399
- public static void QueueReset ( ILogger logger , string queueName , string urlPrefix )
431
+ public static void QueueNoLongerExists ( ILogger logger , string ? queueName , string ? urlPrefix , DelegationQueueState queueState , Exception ? ex )
400
432
{
401
- _queueReset ( logger , queueName , urlPrefix , null ) ;
433
+ _queueNoLongerExists ( logger , queueName , urlPrefix , queueState . Id , ex ) ;
402
434
}
403
435
404
- public static void QueueNoLongerExists ( ILogger logger , string ? queueName , string ? urlPrefix , Exception ? ex )
436
+ public static void QueueDisposed ( ILogger logger , string ? queueName , string ? urlPrefix )
405
437
{
406
- _queueNoLongerExists ( logger , queueName , urlPrefix , ex ) ;
438
+ _queueDisposed ( logger , queueName , urlPrefix , null ) ;
407
439
}
408
440
409
- public static void DelegatingRequest ( ILogger logger , DestinationState destination )
441
+ public static void DelegatingRequest ( ILogger logger , DestinationState destination , DelegationQueueState queueState )
410
442
{
411
- _delegatingRequest ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , null ) ;
443
+ _delegatingRequest ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , queueState . Id , null ) ;
412
444
}
413
445
414
- public static void DelegationFailed ( ILogger logger , DestinationState destination , Exception ex )
446
+ public static void DelegationFailed ( ILogger logger , DestinationState destination , DelegationQueueState queueState , Exception ex )
415
447
{
416
- _delegationFailed ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , ex ) ;
448
+ _delegationFailed ( logger , destination . DestinationId , destination . GetHttpSysDelegationQueue ( ) , destination . Model ? . Config ? . Address , queueState . Id , ex ) ;
417
449
}
418
450
}
419
451
}
0 commit comments