Skip to content

Commit

Permalink
feat: add circuit breaker event handlers (#461)
Browse files Browse the repository at this point in the history
* feat: add circuit breaker event handlers

* pr-fix: move circuit breaker event handler registration to pump project

* pr-fix: xml document circuit breaker event handler registration

* pr-fix: correct usings

* pr-sug: use more test assertion friendly message

* pr-sug: use more test assertion friendly message

* pr-sug: use is-half-open property to determine state transition

* pr-sug: rename event handler registration 'WithCircuitBreakerStateChangedEventHandler'

* pr-fix: order of telemetry test assertions

* pr-fix: correct timeout

* pr-fix: reorder test assertions

* pr-fix: reorder test assertions

* pr-fix: add logging for transaction id

* pr-fix: add logging for transaction id

* pr-fix: add logging for transaction id

* pr-fix: fire and forget event handlers with void

* pr-fix: consistent generic type name

* pr-fix: streamline test

* temp log exception

* temp log exception

* temp remove until conditions

* pr-fix: streamline test assertions

* pr-fix: streamline test assertions

* pr-fix: correct with simplified trace parent

* pr-fix: specific operation name

* pr-fix: log get request telemetry

* pr-fix: log get request telemetry

* pr-fix: order of log registration

* pr-fix: pin piont exact failur

* pr-fix: diff serilog setup

* pr-fix: remove unnecessary event hubs package downgrade
  • Loading branch information
stijnmoreels authored Jan 28, 2025
1 parent 458e8ae commit ef650a7
Show file tree
Hide file tree
Showing 16 changed files with 378 additions and 97 deletions.
25 changes: 25 additions & 0 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@ Both the recovery period after the circuit is open and the interval between mess
}
```

#### 🔔 Get notified on a circuit breaker state transition
Transitions from circuit breaker states happens internally and automatically. The library supports a notification system that lets you register event handlers that gets called upon a state transition.

After the message pump and/or message handlers are registered, you can add one or more handlers linked to specific transitions to the previously registered pump.

```csharp
using Arcus.Messaging.Pumps.Abstractions;

services.AddServiceBusMessagePump(...)
.WithCircuitBreakerStateChangedEventHandler<MyFirstCircuitBreakerEventHandler>()
.WithCircuitBreakerStateChangedEventHandler<MySecondCircuitBreakerEventHandler>();
```

The instances should implement the `ICircuitBreakerEventHandler`, which allows you to access the new state for the pump.

```csharp
public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
public Task OnTransitionAsync(MessagePumpCircuitState newState)
{
// ...
}
}
```

### Pause message processing for a fixed period of time
⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider)
/// <param name="serviceProvider">The service provider instance to retrieve all the <see cref="IAzureEventHubsMessageHandler{TMessage}"/> instances.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger<AzureEventHubsMessageRouter> logger)
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger<AzureEventHubsMessageRouter> logger)
: this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger)
{
}
Expand All @@ -57,7 +57,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH
/// <param name="options">The consumer-configurable options to change the behavior of the router.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger<AzureEventHubsMessageRouter> logger)
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger<AzureEventHubsMessageRouter> logger)
: this(serviceProvider, options, (ILogger) logger)
{
}
Expand All @@ -68,7 +68,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH
/// <param name="serviceProvider">The service provider instance to retrieve all the <see cref="IAzureEventHubsMessageHandler{TMessage}"/> instances.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger)
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger)
: this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger)
{
}
Expand All @@ -80,7 +80,7 @@ protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger
/// <param name="options">The consumer-configurable options to change the behavior of the router.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger)
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger)
: base(serviceProvider, options, logger)
{
EventHubsOptions = options ?? new AzureEventHubsMessageRouterOptions();
Expand Down Expand Up @@ -158,8 +158,8 @@ public override async Task RouteMessageAsync<TMessageContext>(
}

Logger.LogEventHubsRequest(
eventHubsNamespace,
consumerGroup,
eventHubsNamespace,
consumerGroup,
eventHubsName,
Options.Telemetry.OperationName,
isSuccessful,
Expand Down
24 changes: 12 additions & 12 deletions src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static IEnumerable<MessageHandler> SubtractFrom(IServiceProvider serviceP
Guard.NotNull(serviceProvider, nameof(serviceProvider), "Requires a collection of services to subtract the message handlers from");
Guard.NotNull(logger, nameof(logger), "Requires a logger instance to write trace messages during the lifetime of the message handlers");

MessageHandler[] registrations =
MessageHandler[] registrations =
serviceProvider.GetServices<MessageHandler>()
.ToArray();

Expand All @@ -98,17 +98,17 @@ public static IEnumerable<MessageHandler> SubtractFrom(IServiceProvider serviceP
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
internal static MessageHandler Create<TMessage, TMessageContext>(
IMessageHandler<TMessage, TMessageContext> messageHandler,
ILogger<IMessageHandler<TMessage, TMessageContext>> logger,
ILogger logger,
string jobId,
Func<TMessage, bool> messageBodyFilter = null,
Func<TMessageContext, bool> messageContextFilter = null,
IMessageBodySerializer messageBodySerializer = null)
IMessageBodySerializer messageBodySerializer = null)
where TMessageContext : MessageContext
{
Guard.NotNull(messageHandler, nameof(messageHandler), "Requires a message handler implementation to register the handler within the application services");

ProcessMessageAsync processMessageAsync = DetermineMessageImplementation(messageHandler);
logger = logger ?? NullLogger<IMessageHandler<TMessage, TMessageContext>>.Instance;
logger ??= NullLogger.Instance;
Type messageHandlerType = messageHandler.GetType();

Func<object, bool> messageFilter = DetermineMessageBodyFilter(messageBodyFilter, messageHandlerType, logger);
Expand All @@ -125,7 +125,7 @@ internal static MessageHandler Create<TMessage, TMessageContext>(
logger: logger);
}

private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMessageContext>(IMessageHandler<TMessage, TMessageContext> messageHandler)
private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMessageContext>(IMessageHandler<TMessage, TMessageContext> messageHandler)
where TMessageContext : MessageContext
{
return async (rawMessage, generalMessageContext, correlationInfo, cancellationToken) =>
Expand All @@ -144,8 +144,8 @@ private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMes

private static Func<object, bool> DetermineMessageBodyFilter<TMessage>(Func<TMessage, bool> messageBodyFilter, Type messageHandlerType, ILogger logger)
{
return rawMessage =>
{
return rawMessage =>
{
if (messageBodyFilter is null)
{
return true;
Expand All @@ -172,8 +172,8 @@ private static Func<MessageContext, bool> DetermineMessageContextFilter<TMessage
ILogger logger)
where TMessageContext : MessageContext
{
return rawContext =>
{
return rawContext =>
{
if (rawContext is not null && jobId is not null && rawContext.JobId != jobId)
{
return false;
Expand Down Expand Up @@ -219,7 +219,7 @@ public Type GetMessageHandlerType()
/// </summary>
/// <typeparam name="TMessageContext">The type of the message context.</typeparam>
/// <param name="messageContext">The context in which the incoming message is processed.</param>
public bool CanProcessMessageBasedOnContext<TMessageContext>(TMessageContext messageContext)
public bool CanProcessMessageBasedOnContext<TMessageContext>(TMessageContext messageContext)
where TMessageContext : MessageContext
{
Guard.NotNull(messageContext, nameof(messageContext), "Requires an message context instance to determine if the message handler can process the message");
Expand Down Expand Up @@ -341,9 +341,9 @@ public async Task<bool> ProcessMessageAsync<TMessageContext>(
const string methodName = nameof(IMessageHandler<object, MessageContext>.ProcessMessageAsync);
try
{
Task<bool> processMessageAsync =
Task<bool> processMessageAsync =
_messageHandlerImplementation(message, messageContext, correlationInfo, cancellationToken);

if (processMessageAsync is null)
{
throw new InvalidOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public MessageHandlerCollection(IServiceCollection services)
/// This ID can be used to get a reference of the previously registered message pump while registering message handlers and other functionality related to the message pump.
/// </summary>
public string JobId { get; set; }

/// <summary>
/// Gets the current available collection of services to register the message handling logic into.
/// </summary>
Expand All @@ -51,7 +51,7 @@ internal void AddMessageHandler<TMessage, TMessageContext>(
where TMessageContext : MessageContext
{
Guard.NotNull(implementationFactory, nameof(implementationFactory), "Requires a function to create a message handler implementation to register the handler within the application services");

Services.AddTransient(
serviceProvider => MessageHandler.Create(
implementationFactory(serviceProvider),
Expand All @@ -71,7 +71,7 @@ internal void AddMessageHandler<TMessage, TMessageContext>(
/// <param name="implementationFactory">The function to create the user-defined fallback message handler instance.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="implementationFactory"/> is <c>null</c>.</exception>
public void AddFallbackMessageHandler<TMessageHandler, TMessage, TMessageContext>(
Func<IServiceProvider, TMessageHandler> implementationFactory)
Func<IServiceProvider, TMessageHandler> implementationFactory)
where TMessageHandler : IFallbackMessageHandler<TMessage, TMessageContext>
where TMessage : class
where TMessageContext : MessageContext
Expand All @@ -80,7 +80,7 @@ public void AddFallbackMessageHandler<TMessageHandler, TMessage, TMessageContext

Services.AddSingleton(
serviceProvider => FallbackMessageHandler<TMessage, TMessageContext>.Create(
implementationFactory(serviceProvider),
implementationFactory(serviceProvider),
JobId,
serviceProvider.GetService<ILogger<IFallbackMessageHandler<TMessage, TMessageContext>>>()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,55 @@ public static IServiceCollection AddMessagePump<TMessagePump>(
services.TryAddSingleton<IMessagePumpLifetime, DefaultMessagePumpLifetime>();
services.TryAddSingleton<IMessagePumpCircuitBreaker>(
provider => new DefaultMessagePumpCircuitBreaker(provider, provider.GetService<ILogger<DefaultMessagePumpCircuitBreaker>>()));

return services.AddHostedService(implementationFactory);
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<TEventHandler>(this IServiceCollection services, string jobId)
where TEventHandler : ICircuitBreakerEventHandler
{
return AddCircuitBreakerEventHandler(services, jobId, provider => ActivatorUtilities.CreateInstance<TEventHandler>(provider));
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <param name="implementationFactory">The factory function to create the custom <see cref="ICircuitBreakerEventHandler"/> implementation.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> or <paramref name="implementationFactory"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<TEventHandler>(
this IServiceCollection services,
string jobId,
Func<IServiceProvider, TEventHandler> implementationFactory)
where TEventHandler : ICircuitBreakerEventHandler
{
if (services is null)
{
throw new ArgumentNullException(nameof(services));
}

if (implementationFactory is null)
{
throw new ArgumentNullException(nameof(implementationFactory));
}

if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID to distinguish the message pump for which this circuit breaker event handler is registered", nameof(jobId));
}

return services.AddTransient(provider => new CircuitBreakerEventHandler(jobId, implementationFactory(provider)));
}
}
}
38 changes: 36 additions & 2 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -134,7 +136,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
}
}

/// <summary>
Expand All @@ -146,7 +153,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
}
}

/// <summary>
Expand All @@ -158,6 +170,8 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);

NotifyCircuitBreakerStateChangedSubscribers();
}

/// <summary>
Expand All @@ -168,6 +182,26 @@ protected void NotifyResumeRetrievingMessages()
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Closed;

NotifyCircuitBreakerStateChangedSubscribers();
}

private void NotifyCircuitBreakerStateChangedSubscribers()
{
ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump();

foreach (var handler in eventHandlers)
{
Task.Run(() => handler.OnTransition(CircuitState));
}
}

private ICircuitBreakerEventHandler[] GetEventHandlersForPump()
{
return ServiceProvider.GetServices<CircuitBreakerEventHandler>()
.Where(registration => registration.JobId == JobId)
.Select(handler => handler.Handler)
.ToArray();
}

/// <summary>
Expand Down
Loading

0 comments on commit ef650a7

Please sign in to comment.