Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add circuit breaker event handlers #461

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cb15d71
feat: add circuit breaker event handlers
stijnmoreels Dec 27, 2024
db49498
pr-fix: move circuit breaker event handler registration to pump project
stijnmoreels Dec 27, 2024
aa4820e
pr-fix: xml document circuit breaker event handler registration
stijnmoreels Dec 27, 2024
2107fee
pr-fix: correct usings
stijnmoreels Dec 27, 2024
5fc8367
pr-sug: use more test assertion friendly message
stijnmoreels Dec 27, 2024
a5a445b
pr-sug: use more test assertion friendly message
stijnmoreels Dec 27, 2024
6a24ad8
pr-sug: use is-half-open property to determine state transition
stijnmoreels Jan 14, 2025
3ebfe3c
pr-sug: rename event handler registration 'WithCircuitBreakerStateCha…
stijnmoreels Jan 17, 2025
72ffe81
pr-fix: order of telemetry test assertions
stijnmoreels Jan 17, 2025
e406104
pr-fix: correct timeout
stijnmoreels Jan 17, 2025
1b09575
pr-fix: reorder test assertions
stijnmoreels Jan 17, 2025
9b084b2
pr-fix: reorder test assertions
stijnmoreels Jan 17, 2025
c4a5abc
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
7b5e467
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
50d12a4
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
f43b305
pr-fix: fire and forget event handlers with void
stijnmoreels Jan 17, 2025
ad333a4
pr-fix: consistent generic type name
stijnmoreels Jan 22, 2025
b67c35e
pr-fix: streamline test
stijnmoreels Jan 22, 2025
98f3027
temp log exception
stijnmoreels Jan 22, 2025
79a667e
temp log exception
stijnmoreels Jan 22, 2025
334a489
temp remove until conditions
stijnmoreels Jan 22, 2025
640918a
pr-fix: streamline test assertions
stijnmoreels Jan 22, 2025
14dce0a
pr-fix: streamline test assertions
stijnmoreels Jan 22, 2025
04bfd35
pr-fix: correct with simplified trace parent
stijnmoreels Jan 22, 2025
8293835
pr-fix: specific operation name
stijnmoreels Jan 22, 2025
285b202
pr-fix: log get request telemetry
stijnmoreels Jan 22, 2025
4f9f5e3
pr-fix: log get request telemetry
stijnmoreels Jan 22, 2025
f11772a
pr-fix: order of log registration
stijnmoreels Jan 23, 2025
ea2ffb8
pr-fix: pin piont exact failur
stijnmoreels Jan 23, 2025
f141c50
pr-fix: diff serilog setup
stijnmoreels Jan 23, 2025
5fef61a
pr-fix: remove unnecessary event hubs package downgrade
stijnmoreels Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@ Both the recovery period after the circuit is open and the interval between mess
}
```

#### 🔔 Get notified on a circuit breaker state transition
Transitions from circuit breaker states happens internally and automatically. The library supports a notification system that lets you register event handlers that gets called upon a state transition.

After the message pump and/or message handlers are registered, you can add one or more handlers linked to specific transitions to the previously registered pump.

```csharp
using Arcus.Messaging.Pumps.Abstractions;

services.AddServiceBusMessagePump(...)
.WithCircuitBreakerStateChangedEventHandler<MyFirstCircuitBreakerEventHandler>()
.WithCircuitBreakerStateChangedEventHandler<MySecondCircuitBreakerEventHandler>();
```

The instances should implement the `ICircuitBreakerEventHandler`, which allows you to access the new state for the pump.

```csharp
public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
public Task OnTransitionAsync(MessagePumpCircuitState newState)
{
// ...
}
}
```

### Pause message processing for a fixed period of time
⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider)
/// <param name="serviceProvider">The service provider instance to retrieve all the <see cref="IAzureEventHubsMessageHandler{TMessage}"/> instances.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger<AzureEventHubsMessageRouter> logger)
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger<AzureEventHubsMessageRouter> logger)
: this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger)
{
}
Expand All @@ -57,7 +57,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH
/// <param name="options">The consumer-configurable options to change the behavior of the router.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger<AzureEventHubsMessageRouter> logger)
public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger<AzureEventHubsMessageRouter> logger)
: this(serviceProvider, options, (ILogger) logger)
{
}
Expand All @@ -68,7 +68,7 @@ public AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventH
/// <param name="serviceProvider">The service provider instance to retrieve all the <see cref="IAzureEventHubsMessageHandler{TMessage}"/> instances.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger)
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger logger)
: this(serviceProvider, new AzureEventHubsMessageRouterOptions(), logger)
{
}
Expand All @@ -80,7 +80,7 @@ protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, ILogger
/// <param name="options">The consumer-configurable options to change the behavior of the router.</param>
/// <param name="logger">The logger instance to write diagnostic trace messages during the routing of the message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger)
protected AzureEventHubsMessageRouter(IServiceProvider serviceProvider, AzureEventHubsMessageRouterOptions options, ILogger logger)
: base(serviceProvider, options, logger)
{
EventHubsOptions = options ?? new AzureEventHubsMessageRouterOptions();
Expand Down Expand Up @@ -158,8 +158,8 @@ public override async Task RouteMessageAsync<TMessageContext>(
}

Logger.LogEventHubsRequest(
eventHubsNamespace,
consumerGroup,
eventHubsNamespace,
consumerGroup,
eventHubsName,
Options.Telemetry.OperationName,
isSuccessful,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static IEnumerable<MessageHandler> SubtractFrom(IServiceProvider serviceP
Guard.NotNull(serviceProvider, nameof(serviceProvider), "Requires a collection of services to subtract the message handlers from");
Guard.NotNull(logger, nameof(logger), "Requires a logger instance to write trace messages during the lifetime of the message handlers");

MessageHandler[] registrations =
MessageHandler[] registrations =
serviceProvider.GetServices<MessageHandler>()
.ToArray();

Expand All @@ -98,17 +98,17 @@ public static IEnumerable<MessageHandler> SubtractFrom(IServiceProvider serviceP
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
internal static MessageHandler Create<TMessage, TMessageContext>(
IMessageHandler<TMessage, TMessageContext> messageHandler,
ILogger<IMessageHandler<TMessage, TMessageContext>> logger,
ILogger logger,
string jobId,
Func<TMessage, bool> messageBodyFilter = null,
Func<TMessageContext, bool> messageContextFilter = null,
IMessageBodySerializer messageBodySerializer = null)
IMessageBodySerializer messageBodySerializer = null)
where TMessageContext : MessageContext
{
Guard.NotNull(messageHandler, nameof(messageHandler), "Requires a message handler implementation to register the handler within the application services");

ProcessMessageAsync processMessageAsync = DetermineMessageImplementation(messageHandler);
logger = logger ?? NullLogger<IMessageHandler<TMessage, TMessageContext>>.Instance;
logger ??= NullLogger.Instance;
Type messageHandlerType = messageHandler.GetType();

Func<object, bool> messageFilter = DetermineMessageBodyFilter(messageBodyFilter, messageHandlerType, logger);
Expand All @@ -125,7 +125,7 @@ internal static MessageHandler Create<TMessage, TMessageContext>(
logger: logger);
}

private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMessageContext>(IMessageHandler<TMessage, TMessageContext> messageHandler)
private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMessageContext>(IMessageHandler<TMessage, TMessageContext> messageHandler)
where TMessageContext : MessageContext
{
return async (rawMessage, generalMessageContext, correlationInfo, cancellationToken) =>
Expand All @@ -144,8 +144,8 @@ private static ProcessMessageAsync DetermineMessageImplementation<TMessage, TMes

private static Func<object, bool> DetermineMessageBodyFilter<TMessage>(Func<TMessage, bool> messageBodyFilter, Type messageHandlerType, ILogger logger)
{
return rawMessage =>
{
return rawMessage =>
{
if (messageBodyFilter is null)
{
return true;
Expand All @@ -172,8 +172,8 @@ private static Func<MessageContext, bool> DetermineMessageContextFilter<TMessage
ILogger logger)
where TMessageContext : MessageContext
{
return rawContext =>
{
return rawContext =>
{
if (rawContext is not null && jobId is not null && rawContext.JobId != jobId)
{
return false;
Expand Down Expand Up @@ -219,7 +219,7 @@ public Type GetMessageHandlerType()
/// </summary>
/// <typeparam name="TMessageContext">The type of the message context.</typeparam>
/// <param name="messageContext">The context in which the incoming message is processed.</param>
public bool CanProcessMessageBasedOnContext<TMessageContext>(TMessageContext messageContext)
public bool CanProcessMessageBasedOnContext<TMessageContext>(TMessageContext messageContext)
where TMessageContext : MessageContext
{
Guard.NotNull(messageContext, nameof(messageContext), "Requires an message context instance to determine if the message handler can process the message");
Expand Down Expand Up @@ -341,9 +341,9 @@ public async Task<bool> ProcessMessageAsync<TMessageContext>(
const string methodName = nameof(IMessageHandler<object, MessageContext>.ProcessMessageAsync);
try
{
Task<bool> processMessageAsync =
Task<bool> processMessageAsync =
_messageHandlerImplementation(message, messageContext, correlationInfo, cancellationToken);

if (processMessageAsync is null)
{
throw new InvalidOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public MessageHandlerCollection(IServiceCollection services)
/// This ID can be used to get a reference of the previously registered message pump while registering message handlers and other functionality related to the message pump.
/// </summary>
public string JobId { get; set; }

/// <summary>
/// Gets the current available collection of services to register the message handling logic into.
/// </summary>
Expand All @@ -51,7 +51,7 @@ internal void AddMessageHandler<TMessage, TMessageContext>(
where TMessageContext : MessageContext
{
Guard.NotNull(implementationFactory, nameof(implementationFactory), "Requires a function to create a message handler implementation to register the handler within the application services");

Services.AddTransient(
serviceProvider => MessageHandler.Create(
implementationFactory(serviceProvider),
Expand All @@ -71,7 +71,7 @@ internal void AddMessageHandler<TMessage, TMessageContext>(
/// <param name="implementationFactory">The function to create the user-defined fallback message handler instance.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="implementationFactory"/> is <c>null</c>.</exception>
public void AddFallbackMessageHandler<TMessageHandler, TMessage, TMessageContext>(
Func<IServiceProvider, TMessageHandler> implementationFactory)
Func<IServiceProvider, TMessageHandler> implementationFactory)
where TMessageHandler : IFallbackMessageHandler<TMessage, TMessageContext>
where TMessage : class
where TMessageContext : MessageContext
Expand All @@ -80,7 +80,7 @@ public void AddFallbackMessageHandler<TMessageHandler, TMessage, TMessageContext

Services.AddSingleton(
serviceProvider => FallbackMessageHandler<TMessage, TMessageContext>.Create(
implementationFactory(serviceProvider),
implementationFactory(serviceProvider),
JobId,
serviceProvider.GetService<ILogger<IFallbackMessageHandler<TMessage, TMessageContext>>>()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,55 @@ public static IServiceCollection AddMessagePump<TMessagePump>(
services.TryAddSingleton<IMessagePumpLifetime, DefaultMessagePumpLifetime>();
services.TryAddSingleton<IMessagePumpCircuitBreaker>(
provider => new DefaultMessagePumpCircuitBreaker(provider, provider.GetService<ILogger<DefaultMessagePumpCircuitBreaker>>()));

return services.AddHostedService(implementationFactory);
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<TEventHandler>(this IServiceCollection services, string jobId)
where TEventHandler : ICircuitBreakerEventHandler
{
return AddCircuitBreakerEventHandler(services, jobId, provider => ActivatorUtilities.CreateInstance<TEventHandler>(provider));
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <param name="implementationFactory">The factory function to create the custom <see cref="ICircuitBreakerEventHandler"/> implementation.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> or <paramref name="implementationFactory"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<TEventHandler>(
this IServiceCollection services,
string jobId,
Func<IServiceProvider, TEventHandler> implementationFactory)
where TEventHandler : ICircuitBreakerEventHandler
{
if (services is null)
{
throw new ArgumentNullException(nameof(services));
}

if (implementationFactory is null)
{
throw new ArgumentNullException(nameof(implementationFactory));
}

if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID to distinguish the message pump for which this circuit breaker event handler is registered", nameof(jobId));
}

return services.AddTransient(provider => new CircuitBreakerEventHandler(jobId, implementationFactory(provider)));
}
}
}
38 changes: 36 additions & 2 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -134,7 +136,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
}
}

/// <summary>
Expand All @@ -146,7 +153,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
}
}

/// <summary>
Expand All @@ -158,6 +170,8 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);

NotifyCircuitBreakerStateChangedSubscribers();
}

/// <summary>
Expand All @@ -168,6 +182,26 @@ protected void NotifyResumeRetrievingMessages()
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Closed;

NotifyCircuitBreakerStateChangedSubscribers();
}

private void NotifyCircuitBreakerStateChangedSubscribers()
{
ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump();

foreach (var handler in eventHandlers)
{
Task.Run(() => handler.OnTransition(CircuitState));
}
}

private ICircuitBreakerEventHandler[] GetEventHandlersForPump()
{
return ServiceProvider.GetServices<CircuitBreakerEventHandler>()
.Where(registration => registration.JobId == JobId)
.Select(handler => handler.Handler)
.ToArray();
}

/// <summary>
Expand Down
Loading
Loading