diff --git a/docs/preview/02-Features/06-general-messaging.md b/docs/preview/02-Features/06-general-messaging.md index 61622fe3..014a3e9b 100644 --- a/docs/preview/02-Features/06-general-messaging.md +++ b/docs/preview/02-Features/06-general-messaging.md @@ -96,9 +96,16 @@ The instances should implement the `ICircuitBreakerEventHandler`, which allows y ```csharp public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler { - public Task OnTransitionAsync(MessagePumpCircuitState newState) + public Task OnTransitionAsync(MessagePumpCircuitStateChang change) { - // ... + // The job ID of the message pump that was transitioned. + string jobId = change.JobId; + + // The circuit breaker state transitions. + MessagePumpCircuitState oldState = change.OldState; + MessagePumpCircuitState newState = change.NewState; + + // Process the state change event... } } ``` diff --git a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs index adc1d346..8df22be9 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs @@ -145,9 +145,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati if (!CircuitState.IsHalfOpen) { - CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + MessagePumpCircuitState + oldState = CircuitState, + newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); - NotifyCircuitBreakerStateChangedSubscribers(); + CircuitState = newState; + NotifyCircuitBreakerStateChangedSubscribers(oldState, newState); } } @@ -162,9 +165,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca if (!CircuitState.IsHalfOpen) { - CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + MessagePumpCircuitState + oldState = CircuitState, + newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); - NotifyCircuitBreakerStateChangedSubscribers(); + CircuitState = newState; + NotifyCircuitBreakerStateChangedSubscribers(oldState, newState); } } @@ -176,9 +182,12 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option { Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState); - CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options); + MessagePumpCircuitState + oldState = CircuitState, + newState = CircuitState.TransitionTo(CircuitBreakerState.Open, options); - NotifyCircuitBreakerStateChangedSubscribers(); + CircuitState = newState; + NotifyCircuitBreakerStateChangedSubscribers(oldState, newState); } /// @@ -188,18 +197,21 @@ protected void NotifyResumeRetrievingMessages() { Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState); - CircuitState = MessagePumpCircuitState.Closed; + MessagePumpCircuitState + oldState = CircuitState, + newState = MessagePumpCircuitState.Closed; - NotifyCircuitBreakerStateChangedSubscribers(); + CircuitState = newState; + NotifyCircuitBreakerStateChangedSubscribers(oldState, newState); } - private void NotifyCircuitBreakerStateChangedSubscribers() + private void NotifyCircuitBreakerStateChangedSubscribers(MessagePumpCircuitState oldState, MessagePumpCircuitState newState) { ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump(); foreach (var handler in eventHandlers) { - Task.Run(() => handler.OnTransition(CircuitState)); + Task.Run(() => handler.OnTransition(new MessagePumpCircuitStateChangedEventArgs(JobId, oldState, newState))); } } diff --git a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs index 66112a34..fb0c9b01 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs @@ -33,8 +33,47 @@ public interface ICircuitBreakerEventHandler /// /// Notifies the application on a change in the message pump's circuit breaker state. /// - /// The new circuit breaker state in which the message pump is currently running on. - void OnTransition(MessagePumpCircuitState newState); + /// The change in the circuit breaker state for a message pump. + void OnTransition(MessagePumpCircuitStateChangedEventArgs args); + } + + /// + /// Represents a change event of the in a . + /// + public class MessagePumpCircuitStateChangedEventArgs + { + /// + /// Initializes a new instance of the class. + /// + internal MessagePumpCircuitStateChangedEventArgs( + string jobId, + MessagePumpCircuitState oldState, + MessagePumpCircuitState newState) + { + if (string.IsNullOrWhiteSpace(jobId)) + { + throw new ArgumentException("Requires a non-blank job ID for the circuit breaker event state change registration", nameof(jobId)); + } + + JobId = jobId; + OldState = oldState; + NewState = newState; + } + + /// + /// Gets the unique ID to distinguish the linked message pump that had it circuit breaker state changed. + /// + public string JobId { get; } + + /// + /// Gets the original circuit breaker state the linked message pump was in. + /// + public MessagePumpCircuitState OldState { get; } + + /// + /// Gets the current circuit breaker state the linked message pump is in. + /// + public MessagePumpCircuitState NewState { get; } } /// diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs index f3208042..19a69e41 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs @@ -1,8 +1,10 @@ -using System.Collections.Generic; +using System; using System.Collections.ObjectModel; -using System.Linq; +using System.Threading.Tasks; using Arcus.Messaging.Pumps.Abstractions.Resiliency; +using Arcus.Testing; using Xunit; +using Xunit.Sdk; namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture { @@ -11,28 +13,32 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture /// internal class MockCircuitBreakerEventHandler : ICircuitBreakerEventHandler { - private readonly Collection _states = new(); + private readonly Collection _states = new(); /// /// Notifies the application on a change in the message pump's circuit breaker state. /// - /// The new circuit breaker state in which the message pump is currently running on. - public void OnTransition(MessagePumpCircuitState newState) + /// The change in the circuit breaker state for a message pump. + public void OnTransition(MessagePumpCircuitStateChangedEventArgs args) { - _states.Add(newState); + _states.Add(args); } - public void ShouldTransitionCorrectly() + /// + /// Verifies that all fired circuit breaker state transitions are happening correctly. + /// + public async Task ShouldTransitionedCorrectlyAsync() { - Assert.NotEmpty(_states); - - MessagePumpCircuitState firstTransition = _states[0]; - Assert.True(firstTransition.IsOpen, $"when the message pump starts up, the first transition should always be from a closed to open state, but got: {firstTransition}"); + await Poll.Target(() => + { + Assert.NotEmpty(_states); + Assert.Equal(3, _states.Count); - IEnumerable<(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)> transitions = - _states.SkipLast(1).Zip(_states.Skip(1)); + }).Every(TimeSpan.FromSeconds(1)) + .Timeout(TimeSpan.FromSeconds(10)) + .FailWith("could not in time find all the fired circuit breaker change events, possibly the message pump did not fired them"); - Assert.All(transitions, t => VerifyCorrectTransition(t.oldState, t.newState)); + Assert.All(_states, t => VerifyCorrectTransition(t.OldState, t.NewState)); } private static void VerifyCorrectTransition( @@ -41,7 +47,7 @@ private static void VerifyCorrectTransition( { if (oldState.IsClosed) { - Assert.True(newState.IsHalfOpen, $"when the message pump comes from a closed state, the next state should always be half-open, but got: {newState}"); + Assert.True(newState.IsOpen, $"when the message pump comes from a closed state, the next state should always be open, but got: {newState}"); } else if (oldState.IsOpen) { diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs index 3ebc2a33..f44bcd83 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs @@ -62,8 +62,8 @@ public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_Cir await producer.ProduceAsync(messageAfterBreak); await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); - mockEventHandler1.ShouldTransitionCorrectly(); - mockEventHandler2.ShouldTransitionCorrectly(); + await mockEventHandler1.ShouldTransitionedCorrectlyAsync(); + await mockEventHandler2.ShouldTransitionedCorrectlyAsync(); } [Fact] @@ -99,8 +99,8 @@ public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_Cir await producer.ProduceAsync(messageAfterBreak); await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); - mockEventHandler1.ShouldTransitionCorrectly(); - mockEventHandler2.ShouldTransitionCorrectly(); + await mockEventHandler1.ShouldTransitionedCorrectlyAsync(); + await mockEventHandler2.ShouldTransitionedCorrectlyAsync(); } private async Task CreateTopicSubscriptionForMessageAsync(params ServiceBusMessage[] messages)