diff --git a/docs/preview/02-Features/06-general-messaging.md b/docs/preview/02-Features/06-general-messaging.md index bcdbb89c..61622fe3 100644 --- a/docs/preview/02-Features/06-general-messaging.md +++ b/docs/preview/02-Features/06-general-messaging.md @@ -78,6 +78,31 @@ Both the recovery period after the circuit is open and the interval between mess } ``` +#### 🔔 Get notified on a circuit breaker state transition +Transitions from circuit breaker states happens internally and automatically. The library supports a notification system that lets you register event handlers that gets called upon a state transition. + +After the message pump and/or message handlers are registered, you can add one or more handlers linked to specific transitions to the previously registered pump. + +```csharp +using Arcus.Messaging.Pumps.Abstractions; + +services.AddServiceBusMessagePump(...) + .WithCircuitBreakerStateChangedEventHandler() + .WithCircuitBreakerStateChangedEventHandler(); +``` + +The instances should implement the `ICircuitBreakerEventHandler`, which allows you to access the new state for the pump. + +```csharp +public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler +{ + public Task OnTransitionAsync(MessagePumpCircuitState newState) + { + // ... + } +} +``` + ### Pause message processing for a fixed period of time ⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality. diff --git a/src/Arcus.Messaging.Abstractions.EventHubs/MessageHandling/AzureEventHubsMessageRouter.cs b/src/Arcus.Messaging.Abstractions.EventHubs/MessageHandling/AzureEventHubsMessageRouter.cs index 8b4ce369..8c3da9a4 100644 --- a/src/Arcus.Messaging.Abstractions.EventHubs/MessageHandling/AzureEventHubsMessageRouter.cs +++ b/src/Arcus.Messaging.Abstractions.EventHubs/MessageHandling/AzureEventHubsMessageRouter.cs @@ -34,7 +34,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider) /// The service provider instance to retrieve all the instances. /// The logger instance to write diagnostic trace messages during the routing of the message. /// Thrown when the is null. - public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger) + public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger) : this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger) { } @@ -57,7 +57,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH /// The consumer-configurable options to change the behavior of the router. /// The logger instance to write diagnostic trace messages during the routing of the message. /// Thrown when the is null. - public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger) + public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger) : this(serviceProvider, options, (ILogger) logger) { } @@ -68,7 +68,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH /// The service provider instance to retrieve all the instances. /// The logger instance to write diagnostic trace messages during the routing of the message. /// Thrown when the is null. - protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger) + protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger) : this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger) { } @@ -80,7 +80,7 @@ protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger /// The consumer-configurable options to change the behavior of the router. /// The logger instance to write diagnostic trace messages during the routing of the message. /// Thrown when the is null. - protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger) + protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger) : base(serviceProvider, options, logger) { EventHubsOptions = options ?? new AzureEventHubsMessageRouterOptions(); @@ -158,8 +158,8 @@ public override async Task RouteMessageAsync( } Logger.LogEventHubsRequest( - eventHubsNamespace, - consumerGroup, + eventHubsNamespace, + consumerGroup, eventHubsName, Options.Telemetry.OperationName, isSuccessful, diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs index 8153c770..b38c8593 100644 --- a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs +++ b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs @@ -76,7 +76,7 @@ public static IEnumerable SubtractFrom(IServiceProvider serviceP Guard.NotNull(serviceProvider, nameof(serviceProvider), "Requires a collection of services to subtract the message handlers from"); Guard.NotNull(logger, nameof(logger), "Requires a logger instance to write trace messages during the lifetime of the message handlers"); - MessageHandler[] registrations = + MessageHandler[] registrations = serviceProvider.GetServices() .ToArray(); @@ -98,17 +98,17 @@ public static IEnumerable SubtractFrom(IServiceProvider serviceP /// Thrown when the is blank. internal static MessageHandler Create( IMessageHandler messageHandler, - ILogger> logger, + ILogger logger, string jobId, Func messageBodyFilter = null, Func messageContextFilter = null, - IMessageBodySerializer messageBodySerializer = null) + IMessageBodySerializer messageBodySerializer = null) where TMessageContext : MessageContext { Guard.NotNull(messageHandler, nameof(messageHandler), "Requires a message handler implementation to register the handler within the application services"); ProcessMessageAsync processMessageAsync = DetermineMessageImplementation(messageHandler); - logger = logger ?? NullLogger>.Instance; + logger ??= NullLogger.Instance; Type messageHandlerType = messageHandler.GetType(); Func messageFilter = DetermineMessageBodyFilter(messageBodyFilter, messageHandlerType, logger); @@ -125,7 +125,7 @@ internal static MessageHandler Create( logger: logger); } - private static ProcessMessageAsync DetermineMessageImplementation(IMessageHandler messageHandler) + private static ProcessMessageAsync DetermineMessageImplementation(IMessageHandler messageHandler) where TMessageContext : MessageContext { return async (rawMessage, generalMessageContext, correlationInfo, cancellationToken) => @@ -144,8 +144,8 @@ private static ProcessMessageAsync DetermineMessageImplementation DetermineMessageBodyFilter(Func messageBodyFilter, Type messageHandlerType, ILogger logger) { - return rawMessage => - { + return rawMessage => + { if (messageBodyFilter is null) { return true; @@ -172,8 +172,8 @@ private static Func DetermineMessageContextFilter - { + return rawContext => + { if (rawContext is not null && jobId is not null && rawContext.JobId != jobId) { return false; @@ -219,7 +219,7 @@ public Type GetMessageHandlerType() /// /// The type of the message context. /// The context in which the incoming message is processed. - public bool CanProcessMessageBasedOnContext(TMessageContext messageContext) + public bool CanProcessMessageBasedOnContext(TMessageContext messageContext) where TMessageContext : MessageContext { Guard.NotNull(messageContext, nameof(messageContext), "Requires an message context instance to determine if the message handler can process the message"); @@ -341,9 +341,9 @@ public async Task ProcessMessageAsync( const string methodName = nameof(IMessageHandler.ProcessMessageAsync); try { - Task processMessageAsync = + Task processMessageAsync = _messageHandlerImplementation(message, messageContext, correlationInfo, cancellationToken); - + if (processMessageAsync is null) { throw new InvalidOperationException( diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandlerCollection.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandlerCollection.cs index a4001035..5be9ed0d 100644 --- a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandlerCollection.cs +++ b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandlerCollection.cs @@ -27,7 +27,7 @@ public MessageHandlerCollection(IServiceCollection services) /// This ID can be used to get a reference of the previously registered message pump while registering message handlers and other functionality related to the message pump. /// public string JobId { get; set; } - + /// /// Gets the current available collection of services to register the message handling logic into. /// @@ -51,7 +51,7 @@ internal void AddMessageHandler( where TMessageContext : MessageContext { Guard.NotNull(implementationFactory, nameof(implementationFactory), "Requires a function to create a message handler implementation to register the handler within the application services"); - + Services.AddTransient( serviceProvider => MessageHandler.Create( implementationFactory(serviceProvider), @@ -71,7 +71,7 @@ internal void AddMessageHandler( /// The function to create the user-defined fallback message handler instance. /// Thrown when the is null. public void AddFallbackMessageHandler( - Func implementationFactory) + Func implementationFactory) where TMessageHandler : IFallbackMessageHandler where TMessage : class where TMessageContext : MessageContext @@ -80,7 +80,7 @@ public void AddFallbackMessageHandler FallbackMessageHandler.Create( - implementationFactory(serviceProvider), + implementationFactory(serviceProvider), JobId, serviceProvider.GetService>>())); } diff --git a/src/Arcus.Messaging.Pumps.Abstractions/Extensions/IServiceCollectionExtensions.cs b/src/Arcus.Messaging.Pumps.Abstractions/Extensions/IServiceCollectionExtensions.cs index 425ba682..e3fb1753 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/Extensions/IServiceCollectionExtensions.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/Extensions/IServiceCollectionExtensions.cs @@ -32,8 +32,55 @@ public static IServiceCollection AddMessagePump( services.TryAddSingleton(); services.TryAddSingleton( provider => new DefaultMessagePumpCircuitBreaker(provider, provider.GetService>())); - + return services.AddHostedService(implementationFactory); } + + /// + /// Adds an implementation for a specific message pump to the application services. + /// + /// The custom type of the event handler. + /// The application services to register the event handler. + /// The unique ID to distinguish the message pump to register this event handler for. + /// Thrown when the is null. + /// Thrown when the is blank. + public static IServiceCollection AddCircuitBreakerEventHandler(this IServiceCollection services, string jobId) + where TEventHandler : ICircuitBreakerEventHandler + { + return AddCircuitBreakerEventHandler(services, jobId, provider => ActivatorUtilities.CreateInstance(provider)); + } + + /// + /// Adds an implementation for a specific message pump to the application services. + /// + /// The custom type of the event handler. + /// The application services to register the event handler. + /// The unique ID to distinguish the message pump to register this event handler for. + /// The factory function to create the custom implementation. + /// Thrown when the or is null. + /// Thrown when the is blank. + public static IServiceCollection AddCircuitBreakerEventHandler( + this IServiceCollection services, + string jobId, + Func implementationFactory) + where TEventHandler : ICircuitBreakerEventHandler + { + if (services is null) + { + throw new ArgumentNullException(nameof(services)); + } + + if (implementationFactory is null) + { + throw new ArgumentNullException(nameof(implementationFactory)); + } + + if (string.IsNullOrWhiteSpace(jobId)) + { + throw new ArgumentException("Requires a non-blank job ID to distinguish the message pump for which this circuit breaker event handler is registered", nameof(jobId)); + } + + return services.AddTransient(provider => new CircuitBreakerEventHandler(jobId, implementationFactory(provider))); + } } } diff --git a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs index 12424ee1..bb0c0e21 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs @@ -1,10 +1,12 @@ using System; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Arcus.Messaging.Pumps.Abstractions.Resiliency; using GuardNet; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -134,7 +136,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState); await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken); - CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + if (!CircuitState.IsHalfOpen) + { + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + + NotifyCircuitBreakerStateChangedSubscribers(); + } } /// @@ -146,7 +153,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState); await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken); - CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + if (!CircuitState.IsHalfOpen) + { + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + + NotifyCircuitBreakerStateChangedSubscribers(); + } } /// @@ -158,6 +170,8 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState); CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options); + + NotifyCircuitBreakerStateChangedSubscribers(); } /// @@ -168,6 +182,26 @@ protected void NotifyResumeRetrievingMessages() Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState); CircuitState = MessagePumpCircuitState.Closed; + + NotifyCircuitBreakerStateChangedSubscribers(); + } + + private void NotifyCircuitBreakerStateChangedSubscribers() + { + ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump(); + + foreach (var handler in eventHandlers) + { + Task.Run(() => handler.OnTransition(CircuitState)); + } + } + + private ICircuitBreakerEventHandler[] GetEventHandlersForPump() + { + return ServiceProvider.GetServices() + .Where(registration => registration.JobId == JobId) + .Select(handler => handler.Handler) + .ToArray(); } /// diff --git a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs index 9ac687a1..66112a34 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs @@ -25,6 +25,54 @@ public interface IMessagePumpCircuitBreaker Task PauseMessageProcessingAsync(string jobId, Action configureOptions); } + /// + /// Represents an instance to notify changes in circuit breaker states for given message pumps. + /// + public interface ICircuitBreakerEventHandler + { + /// + /// Notifies the application on a change in the message pump's circuit breaker state. + /// + /// The new circuit breaker state in which the message pump is currently running on. + void OnTransition(MessagePumpCircuitState newState); + } + + /// + /// Represents a registration of an instance in the application services, + /// specifically linked to a message pump. + /// + internal sealed class CircuitBreakerEventHandler + { + /// + /// Initializes a new instance of the class. + /// + public CircuitBreakerEventHandler(string jobId, ICircuitBreakerEventHandler handler) + { + if (string.IsNullOrWhiteSpace(jobId)) + { + throw new ArgumentException("Requires a non-blank job ID for the circuit breaker event handler registration", nameof(jobId)); + } + + if (handler is null) + { + throw new ArgumentNullException(nameof(handler)); + } + + JobId = jobId; + Handler = handler; + } + + /// + /// Gets the unique ID to distinguish the linked message pump. + /// + public string JobId { get; } + + /// + /// Gets the event handler implementation to trigger on transition changes in the linked message pump. + /// + public ICircuitBreakerEventHandler Handler { get; } + } + /// /// Represents the available states in the in which the message pump can transition into. /// diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs index 8bf17b2a..ece782c4 100644 --- a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs +++ b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs @@ -8,7 +8,6 @@ using Arcus.Messaging.Abstractions.ServiceBus; using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; using Arcus.Messaging.Pumps.Abstractions; -using Arcus.Messaging.Pumps.Abstractions.Resiliency; using Arcus.Messaging.Pumps.ServiceBus.Configuration; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs b/src/Arcus.Messaging.Pumps.ServiceBus/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs new file mode 100644 index 00000000..957be780 --- /dev/null +++ b/src/Arcus.Messaging.Pumps.ServiceBus/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs @@ -0,0 +1,53 @@ +using System; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Arcus.Messaging.Pumps.Abstractions.Resiliency; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + /// + /// Extensions on the to add an 's implementations. + /// + // ReSharper disable once InconsistentNaming + public static class ServiceBusMessageHandlerCollectionExtensions + { + /// + /// Adds an implementation for a specific message pump to the application services. + /// + /// The custom type of the event handler. + /// The application services to register the event handler. + /// Thrown when the is null. + public static ServiceBusMessageHandlerCollection WithCircuitBreakerStateChangedEventHandler( + this ServiceBusMessageHandlerCollection collection) + where TEventHandler : ICircuitBreakerEventHandler + { + return WithCircuitBreakerStateChangedEventHandler(collection, provider => ActivatorUtilities.CreateInstance(provider)); + } + + /// + /// Adds an implementation for a specific message pump to the application services. + /// + /// The custom type of the event handler. + /// The application services to register the event handler. + /// The factory function to create the custom implementation. + /// Thrown when the or is null. + public static ServiceBusMessageHandlerCollection WithCircuitBreakerStateChangedEventHandler( + this ServiceBusMessageHandlerCollection collection, + Func implementationFactory) + where TEventHandler : ICircuitBreakerEventHandler + { + if (collection is null) + { + throw new ArgumentNullException(nameof(collection)); + } + + if (implementationFactory is null) + { + throw new ArgumentNullException(nameof(implementationFactory)); + } + + collection.Services.AddCircuitBreakerEventHandler(collection.JobId, implementationFactory); + return collection; + } + } +} diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs index 282a91dd..f9bcb4cc 100644 --- a/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; @@ -53,8 +53,8 @@ public WorkerOptions AddXunitTestLogging(ITestOutputHelper logger) { Guard.NotNull(logger, nameof(logger), "Requires a logger instance to write diagnostic trace messages to the test output"); - _outputWriter = logger; - return this; + _outputWriter = logger; + return this; } /// @@ -67,6 +67,7 @@ public WorkerOptions ConfigureSerilog(Action configure) Guard.NotNull(configure, nameof(configure)); _additionalSerilogConfigOptions.Add(configure); + return this; } @@ -78,8 +79,25 @@ public WorkerOptions ConfigureSerilog(Action configure) internal void ApplyOptions(IHostBuilder hostBuilder) { Guard.NotNull(hostBuilder, nameof(hostBuilder), "Requires a host builder instance to apply the worker options to"); - - hostBuilder.ConfigureAppConfiguration(config => config.AddInMemoryCollection(Configuration)) + + LoggerConfiguration config = + new LoggerConfiguration() + .MinimumLevel.Verbose() + .Enrich.FromLogContext(); + + if (_outputWriter != null) + { + config.WriteTo.XunitTestLogging(_outputWriter); + } + + foreach (Action configure in _additionalSerilogConfigOptions) + { + configure(config); + } + + Log.Logger = config.CreateLogger(); + + hostBuilder.ConfigureAppConfiguration(builder => builder.AddInMemoryCollection(Configuration)) .ConfigureServices(services => { foreach (ServiceDescriptor service in Services) @@ -87,21 +105,7 @@ internal void ApplyOptions(IHostBuilder hostBuilder) services.Add(service); } }) - .UseSerilog((context, config) => - { - config.MinimumLevel.Verbose() - .Enrich.FromLogContext(); - - if (_outputWriter != null) - { - config.WriteTo.XunitTestLogging(_outputWriter); - } - - foreach (Action configure in _additionalSerilogConfigOptions) - { - configure(config); - } - }); + .UseSerilog(Log.Logger); foreach (Action additionalHostOption in _additionalHostOptions) { diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs index 5621c666..43ff7475 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Arcus.Messaging.Pumps.Abstractions; using Arcus.Messaging.Pumps.EventHubs; +using Arcus.Messaging.Tests.Core.Correlation; using Arcus.Messaging.Tests.Core.Events.v1; using Arcus.Messaging.Tests.Core.Messages.v1; using Arcus.Messaging.Tests.Integration.Fixture; @@ -34,7 +35,7 @@ await TestEventHubsMessageHandlingAsync(options => [storageAccountConnectionStringSecretName] = _eventHubsConfig.Storage.ConnectionString })); - options.AddEventHubsMessagePump(EventHubsName, eventHubsConnectionStringSecretName, ContainerName, storageAccountConnectionStringSecretName) + options.AddEventHubsMessagePump(EventHubsName, eventHubsConnectionStringSecretName, ContainerName, storageAccountConnectionStringSecretName) .WithEventHubsMessageHandler(); }); } @@ -47,11 +48,11 @@ public async Task RestartedEventHubsMessagePump_PublishMessage_MessageSuccessful AddEventHubsMessagePump(options) .WithEventHubsMessageHandler(); - EventData expected = CreateSensorEventDataForW3C(); + EventData expected = CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()); TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); await using var worker = await Worker.StartNewAsync(options); - + IEnumerable messagePumps = worker.Services.GetServices() .OfType(); @@ -78,11 +79,11 @@ public async Task EventHubsMessagePump_PausesViaLifetime_RestartsAgain() AddEventHubsMessagePump(options, opt => opt.JobId = jobId) .WithEventHubsMessageHandler(); - EventData expected = CreateSensorEventDataForW3C(); + EventData expected = CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()); TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - + await using var worker = await Worker.StartNewAsync(options); - + var lifetime = worker.Services.GetRequiredService(); await lifetime.PauseProcessingMessagesAsync(jobId, TimeSpan.FromSeconds(5), CancellationToken.None); diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs index 11b6342a..71188d9d 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; using Arcus.Messaging.Abstractions.MessageHandling; using Arcus.Messaging.Tests.Core.Correlation; @@ -92,7 +90,7 @@ await TestEventHubsMessageHandlingAsync(options, message, async () => }); } - [Fact] + [Fact] public async Task EventHubsMessagePump_WithW3CCorrelationFormat_AutomaticallyTracksMicrosoftDependencies() { // Arrange @@ -107,40 +105,37 @@ public async Task EventHubsMessagePump_WithW3CCorrelationFormat_AutomaticallyTra string operationName = $"operation-{Guid.NewGuid()}"; AddEventHubsMessagePump(options, opt => opt.Routing.Telemetry.OperationName = operationName) - .WithEventHubsMessageHandler( - messageBodyFilter: msg => msg.SensorId == eventData.MessageId); + .WithEventHubsMessageHandler(); TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - await using var worker = await Worker.StartNewAsync(options); - worker.Services.GetRequiredService().TelemetryChannel = spyChannel; + await using (var worker = await Worker.StartNewAsync(options)) + { + worker.Services.GetRequiredService().TelemetryChannel = spyChannel; - // Act - await producer.ProduceAsync(eventData); + // Act + await producer.ProduceAsync(eventData); - // Assert - TimeSpan timeout = TimeSpan.FromMinutes(2); + // Assert + RequestTelemetry requestViaArcusEventHubs = + await Poll.Target(() => GetRequestFrom(spySink.Telemetries, r => r.Context.Operation.Name == operationName)) + .Timeout(TimeSpan.FromMinutes(2)) + .FailWith("missing request telemetry with operation name in spied sink"); - DependencyTelemetry dependencyViaArcusKeyVault = - await Poll.Target(() => GetDependencyFrom(spySink.Telemetries, d => d.Type == "Azure key vault")) - .Until(d => d.Context.Operation.Id == traceParent.TransactionId) - .Timeout(timeout) - .FailWith("missing Key vault dependency telemetry tracking via Arcus with W3C format in spied sink"); + DependencyTelemetry dependencyViaArcusKeyVault = + await Poll.Target(() => GetDependencyFrom(spySink.Telemetries, d => d.Type == "Azure key vault")) + .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) + .FailWith("missing Key vault dependency telemetry tracking via Arcus in spied sink"); - DependencyTelemetry dependencyViaMicrosoftSql = - await Poll.Target(() => GetDependencyFrom(spyChannel.Telemetries, d => d.Type == "SQL")) - .Until(d => d.Context.Operation.Id == traceParent.TransactionId) - .Timeout(timeout) - .FailWith("missing SQL dependency telemetry tracking via Microsoft with W3C format in spied channel"); - - RequestTelemetry requestViaArcusEventHubs = - await Poll.Target(() => GetRequestFrom(spySink.Telemetries, r => r.Name == operationName)) - .Until(r => r.Context.Operation.Id == traceParent.TransactionId) - .Timeout(timeout) - .FailWith("missing request telemetry tracking with W3C format in spied sink"); - - Assert.Equal(requestViaArcusEventHubs.Id, dependencyViaArcusKeyVault.Context.Operation.ParentId); - Assert.Equal(requestViaArcusEventHubs.Id, dependencyViaMicrosoftSql.Context.Operation.ParentId); + DependencyTelemetry dependencyViaMicrosoftSql = + await Poll.Target(() => GetDependencyFrom(spyChannel.Telemetries, d => d.Type == "SQL")) + .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) + .FailWith("missing SQL dependency telemetry racking via Microsoft on spied channel"); + + Assert.Equal(requestViaArcusEventHubs.Context.Operation.Id, traceParent.TransactionId); + Assert.Equal(dependencyViaArcusKeyVault.Context.Operation.Id, traceParent.TransactionId); + Assert.Equal(dependencyViaMicrosoftSql.Context.Operation.Id, traceParent.TransactionId); + } } [Fact] @@ -168,7 +163,7 @@ public async Task EventHubsMessagePump_WithW3CCorrelationFormatForNewParent_Auto await producer.ProduceAsync(eventData); // Assert - RequestTelemetry requestViaArcusEventHubs = + RequestTelemetry requestViaArcusEventHubs = await Poll.Target(() => GetRequestFrom(spySink.Telemetries, r => r.Name == operationName)) .Timeout(TimeSpan.FromMinutes(2)) .FailWith("missing request telemetry with operation name in spied sink"); @@ -177,7 +172,6 @@ await Poll.Target(() => GetDependencyFrom(spySink.Telemetries, d => d.Type == "A .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) .FailWith("missing Key vault dependency telemetry tracking via Arcus in spied sink"); - await Poll.Target(() => GetDependencyFrom(spyChannel.Telemetries, d => d.Type == "SQL")) .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) .FailWith("missing SQL dependency telemetry racking via Microsoft on spied channel"); diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs index 4985b5b2..9f2f0a30 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs @@ -44,10 +44,10 @@ public EventHubsMessagePumpTests(EventHubsEntityFixture fixture, ITestOutputHelp { _outputWriter = outputWriter; _logger = new XunitTestLogger(outputWriter); - + _config = TestConfig.Create(); _eventHubsConfig = _config.GetEventHubs(); - + EventHubsName = fixture.HubName; } @@ -76,13 +76,13 @@ private EventHubsMessageHandlerCollection AddEventHubsMessagePump(WorkerOptions } private async Task TestEventHubsMessageHandlingAsync( - Action configureOptions, + Action configureOptions, MessageCorrelationFormat format = MessageCorrelationFormat.W3C, [CallerMemberName] string memberName = null) { EventData message = format switch { - MessageCorrelationFormat.W3C => CreateSensorEventDataForW3C(), + MessageCorrelationFormat.W3C => CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()), MessageCorrelationFormat.Hierarchical => CreateSensorEventDataForHierarchical(), }; @@ -97,11 +97,11 @@ await TestEventHubsMessageHandlingAsync(options, message, async () => case MessageCorrelationFormat.W3C: AssertReceivedSensorEventDataForW3C(message, eventData); break; - + case MessageCorrelationFormat.Hierarchical: AssertReceivedSensorEventDataForHierarchical(message, eventData); break; - + default: throw new ArgumentOutOfRangeException(nameof(format), format, null); } @@ -151,7 +151,6 @@ private static EventData CreateSensorEventDataForHierarchical( private static EventData CreateSensorEventDataForW3C(Encoding encoding = null, TraceParent traceParent = null) { encoding ??= Encoding.UTF8; - traceParent ??= TraceParent.Generate(); SensorReading reading = SensorReadingGenerator.Generate(); string json = JsonConvert.SerializeObject(reading); @@ -167,7 +166,9 @@ private static EventData CreateSensorEventDataForW3C(Encoding encoding = null, T } }; - return message.WithDiagnosticId(traceParent); + return traceParent is null + ? message + : message.WithDiagnosticId(traceParent); } private static void AssertReceivedSensorEventDataForHierarchical( diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs index dbe0c3cc..62267717 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs @@ -34,6 +34,8 @@ public static RequestTelemetry GetRequestFrom( IEnumerable telemetries, Predicate filter) { + Assert.NotEmpty(telemetries); + ITelemetry[] result = telemetries.Where(t => t is RequestTelemetry r && filter(r)).ToArray(); Assert.True(result.Length > 0, "Should find at least a single request telemetry, but got none"); @@ -44,6 +46,8 @@ public static DependencyTelemetry GetDependencyFrom( IEnumerable telemetries, Predicate filter) { + Assert.NotEmpty(telemetries); + ITelemetry[] result = telemetries.Where(t => t is DependencyTelemetry r && filter(r)).ToArray(); Assert.True(result.Length > 0, "Should find at least a single dependency telemetry, but got none"); diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs new file mode 100644 index 00000000..f3208042 --- /dev/null +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/MockCircuitBreakerEventHandler.cs @@ -0,0 +1,56 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using Arcus.Messaging.Pumps.Abstractions.Resiliency; +using Xunit; + +namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture +{ + /// + /// Represents an implementation that verifies if the circuit breaker transitions states are correct. + /// + internal class MockCircuitBreakerEventHandler : ICircuitBreakerEventHandler + { + private readonly Collection _states = new(); + + /// + /// Notifies the application on a change in the message pump's circuit breaker state. + /// + /// The new circuit breaker state in which the message pump is currently running on. + public void OnTransition(MessagePumpCircuitState newState) + { + _states.Add(newState); + } + + public void ShouldTransitionCorrectly() + { + Assert.NotEmpty(_states); + + MessagePumpCircuitState firstTransition = _states[0]; + Assert.True(firstTransition.IsOpen, $"when the message pump starts up, the first transition should always be from a closed to open state, but got: {firstTransition}"); + + IEnumerable<(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)> transitions = + _states.SkipLast(1).Zip(_states.Skip(1)); + + Assert.All(transitions, t => VerifyCorrectTransition(t.oldState, t.newState)); + } + + private static void VerifyCorrectTransition( + MessagePumpCircuitState oldState, + MessagePumpCircuitState newState) + { + if (oldState.IsClosed) + { + Assert.True(newState.IsHalfOpen, $"when the message pump comes from a closed state, the next state should always be half-open, but got: {newState}"); + } + else if (oldState.IsOpen) + { + Assert.True(newState.IsHalfOpen, $"when the message pump comes from an open state, the next state should always be half-open, but got: {newState}"); + } + else if (oldState.IsHalfOpen) + { + Assert.True(newState.IsClosed, $"when the message pump comes from a half-open state, the next state should always be closed, but got: {newState}"); + } + } + } +} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs index 8fbed1ff..3ebc2a33 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs @@ -12,6 +12,7 @@ using Arcus.Messaging.Tests.Core.Events.v1; using Arcus.Messaging.Tests.Core.Messages.v1; using Arcus.Messaging.Tests.Integration.Fixture; +using Arcus.Messaging.Tests.Integration.MessagePump.Fixture; using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; using Arcus.Messaging.Tests.Workers.MessageHandlers; using Arcus.Testing; @@ -33,6 +34,8 @@ public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_Cir { // Arrange var messageSink = new OrderMessageSink(); + var mockEventHandler1 = new MockCircuitBreakerEventHandler(); + var mockEventHandler2 = new MockCircuitBreakerEventHandler(); ServiceBusMessage messageBeforeBreak = CreateOrderServiceBusMessageForW3C(); ServiceBusMessage messageAfterBreak = CreateOrderServiceBusMessageForW3C(); @@ -42,8 +45,10 @@ public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_Cir .AddSingleton(messageSink) .AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName) .WithServiceBusMessageHandler( - messageContextFilter: ctx => ctx.MessageId == messageBeforeBreak.MessageId - || ctx.MessageId == messageAfterBreak.MessageId); + messageContextFilter: ctx => ctx.MessageId == messageBeforeBreak.MessageId + || ctx.MessageId == messageAfterBreak.MessageId) + .WithCircuitBreakerStateChangedEventHandler(_ => mockEventHandler1) + .WithCircuitBreakerStateChangedEventHandler(_ => mockEventHandler2); var producer = new TestServiceBusMessageProducer(QueueName, _config.GetServiceBus()); await using var worker = await Worker.StartNewAsync(options); @@ -56,6 +61,9 @@ public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_Cir await producer.ProduceAsync(messageAfterBreak); await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); + + mockEventHandler1.ShouldTransitionCorrectly(); + mockEventHandler2.ShouldTransitionCorrectly(); } [Fact] @@ -64,8 +72,10 @@ public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_Cir // Arrange ServiceBusMessage messageBeforeBreak = CreateOrderServiceBusMessageForW3C(); ServiceBusMessage messageAfterBreak = CreateOrderServiceBusMessageForW3C(); - + var messageSink = new OrderMessageSink(); + var mockEventHandler1 = new MockCircuitBreakerEventHandler(); + var mockEventHandler2 = new MockCircuitBreakerEventHandler(); await using TemporaryTopicSubscription subscription = await CreateTopicSubscriptionForMessageAsync(messageBeforeBreak, messageAfterBreak); var options = new WorkerOptions(); @@ -73,7 +83,9 @@ public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_Cir .ConfigureSerilog(logging => logging.MinimumLevel.Verbose()) .AddSingleton(messageSink) .AddServiceBusTopicMessagePumpUsingManagedIdentity(TopicName, subscription.Name, HostName) - .WithServiceBusMessageHandler(); + .WithServiceBusMessageHandler() + .WithCircuitBreakerStateChangedEventHandler(_ => mockEventHandler1) + .WithCircuitBreakerStateChangedEventHandler(_ => mockEventHandler2); var producer = new TestServiceBusMessageProducer(TopicName, _config.GetServiceBus()); await using var worker = await Worker.StartNewAsync(options); @@ -83,9 +95,12 @@ public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_Cir // Assert await messageSink.ShouldReceiveOrdersDuringBreakAsync(messageBeforeBreak.MessageId); - + await producer.ProduceAsync(messageAfterBreak); await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); + + mockEventHandler1.ShouldTransitionCorrectly(); + mockEventHandler2.ShouldTransitionCorrectly(); } private async Task CreateTopicSubscriptionForMessageAsync(params ServiceBusMessage[] messages)