Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add circuit breaker event handlers #461

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cb15d71
feat: add circuit breaker event handlers
stijnmoreels Dec 27, 2024
db49498
pr-fix: move circuit breaker event handler registration to pump project
stijnmoreels Dec 27, 2024
aa4820e
pr-fix: xml document circuit breaker event handler registration
stijnmoreels Dec 27, 2024
2107fee
pr-fix: correct usings
stijnmoreels Dec 27, 2024
5fc8367
pr-sug: use more test assertion friendly message
stijnmoreels Dec 27, 2024
a5a445b
pr-sug: use more test assertion friendly message
stijnmoreels Dec 27, 2024
6a24ad8
pr-sug: use is-half-open property to determine state transition
stijnmoreels Jan 14, 2025
3ebfe3c
pr-sug: rename event handler registration 'WithCircuitBreakerStateCha…
stijnmoreels Jan 17, 2025
72ffe81
pr-fix: order of telemetry test assertions
stijnmoreels Jan 17, 2025
e406104
pr-fix: correct timeout
stijnmoreels Jan 17, 2025
1b09575
pr-fix: reorder test assertions
stijnmoreels Jan 17, 2025
9b084b2
pr-fix: reorder test assertions
stijnmoreels Jan 17, 2025
c4a5abc
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
7b5e467
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
50d12a4
pr-fix: add logging for transaction id
stijnmoreels Jan 17, 2025
f43b305
pr-fix: fire and forget event handlers with void
stijnmoreels Jan 17, 2025
ad333a4
pr-fix: consistent generic type name
stijnmoreels Jan 22, 2025
b67c35e
pr-fix: streamline test
stijnmoreels Jan 22, 2025
98f3027
temp log exception
stijnmoreels Jan 22, 2025
79a667e
temp log exception
stijnmoreels Jan 22, 2025
334a489
temp remove until conditions
stijnmoreels Jan 22, 2025
640918a
pr-fix: streamline test assertions
stijnmoreels Jan 22, 2025
14dce0a
pr-fix: streamline test assertions
stijnmoreels Jan 22, 2025
04bfd35
pr-fix: correct with simplified trace parent
stijnmoreels Jan 22, 2025
8293835
pr-fix: specific operation name
stijnmoreels Jan 22, 2025
285b202
pr-fix: log get request telemetry
stijnmoreels Jan 22, 2025
4f9f5e3
pr-fix: log get request telemetry
stijnmoreels Jan 22, 2025
f11772a
pr-fix: order of log registration
stijnmoreels Jan 23, 2025
ea2ffb8
pr-fix: pin piont exact failur
stijnmoreels Jan 23, 2025
f141c50
pr-fix: diff serilog setup
stijnmoreels Jan 23, 2025
5fef61a
pr-fix: remove unnecessary event hubs package downgrade
stijnmoreels Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@ Both the recovery period after the circuit is open and the interval between mess
}
```

#### 🔔 Get notified on a circuit breaker state transition
Transitions from circuit breaker states happens internally and automatically. The library supports a notification system that lets you register event handlers that gets called upon a state transition.

After the message pump and/or message handlers are registered, you can add one or more handlers linked to specific transitions to the previously registered pump.

```csharp
using Arcus.Messaging.Pumps.Abstractions;

services.AddServiceBusMessagePump(...)
.WithCircuitBreakerEventHandler<MyFirstCircuitBreakerEventHandler>()
.WithCircuitBreakerEventHandler<MySecondCircuitBreakerEventHandler>();
```

The instances should implement the `ICircuitBreakerEventHandler`, which allows you to access the new state for the pump.

```csharp
public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
public Task OnTransitionAsync(MessagePumpCircuitState newState)
{
// ...
}
}
```

### Pause message processing for a fixed period of time
⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,52 @@ public static IServiceCollection AddMessagePump<TMessagePump>(

return services.AddHostedService(implementationFactory);
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="THandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<THandler>(this IServiceCollection services, string jobId)
where THandler : ICircuitBreakerEventHandler
{
return AddCircuitBreakerEventHandler(services, jobId, provider => ActivatorUtilities.CreateInstance<THandler>(provider));
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="services">The application services to register the event handler.</param>
/// <param name="jobId">The unique ID to distinguish the message pump to register this event handler for.</param>
/// <param name="implementationFactory">The factory function to create the custom <see cref="ICircuitBreakerEventHandler"/> implementation.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="services"/> or <paramref name="implementationFactory"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static IServiceCollection AddCircuitBreakerEventHandler<TEventHandler>(
this IServiceCollection services,
string jobId,
Func<IServiceProvider, TEventHandler> implementationFactory)
where TEventHandler : ICircuitBreakerEventHandler
{
if (services is null)
{
throw new ArgumentNullException(nameof(services));
}

if (implementationFactory is null)
{
throw new ArgumentNullException(nameof(implementationFactory));
}

if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID to distinguish the message pump for which this circuit breaker event handler is registered", nameof(jobId));
}

return services.AddTransient(provider => new CircuitBreakerEventHandler(jobId, implementationFactory(provider)));
}
}
}
41 changes: 39 additions & 2 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -134,7 +136,15 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken);

MessagePumpCircuitState oldState = CircuitState;
bool shouldTriggerTransitionEvent = !oldState.IsHalfOpen;

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

if (shouldTriggerTransitionEvent)
{
await NotifyCircuitBreakerEventHandlersAsync();
}
}

/// <summary>
Expand All @@ -146,28 +156,55 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken);

MessagePumpCircuitState oldState = CircuitState;
bool shouldTriggerTransitionEvent = !oldState.IsHalfOpen;

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

if (shouldTriggerTransitionEvent)
{
await NotifyCircuitBreakerEventHandlersAsync();
}
}

/// <summary>
/// Notifies the message pump about the new state which pauses message retrieval.
/// </summary>
/// <param name="options">The additional accompanied options that goes with the new state.</param>
internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions options)
internal async Task NotifyPauseReceiveMessagesAsync(MessagePumpCircuitBreakerOptions options)
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);

await NotifyCircuitBreakerEventHandlersAsync();
}

/// <summary>
/// Notifies the message pump about the new state which resumes message retrieval.
/// </summary>
protected void NotifyResumeRetrievingMessages()
protected async Task NotifyResumeRetrievingMessagesAsync()
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Closed;

await NotifyCircuitBreakerEventHandlersAsync();
}

private async Task NotifyCircuitBreakerEventHandlersAsync()
{
ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump();

await Task.WhenAll(eventHandlers.Select(h => h.OnTransitionAsync(CircuitState)));
}

private ICircuitBreakerEventHandler[] GetEventHandlersForPump()
{
return ServiceProvider.GetServices<CircuitBreakerEventHandler>()
.Where(registration => registration.JobId == JobId)
.Select(handler => handler.Handler)
.ToArray();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public DefaultMessagePumpCircuitBreaker(IServiceProvider serviceProvider, ILogge
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <param name="configureOptions">The optional user-configurable options to manipulate the workings of the message pump interaction.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public virtual Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
public virtual async Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
{
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

Expand All @@ -47,19 +47,18 @@ public virtual Task PauseMessageProcessingAsync(string jobId, Action<MessagePump
if (!messagePump.IsStarted)
{
_logger.LogWarning("Cannot pause message pump '{JobId}' because the pump has not been started", jobId);
return Task.CompletedTask;
return;
}

if (!messagePump.CircuitState.IsClosed)
{
return Task.CompletedTask;
return;
}

var options = new MessagePumpCircuitBreakerOptions();
configureOptions?.Invoke(options);

messagePump.NotifyPauseReceiveMessages(options);
return Task.CompletedTask;
await messagePump.NotifyPauseReceiveMessagesAsync(options);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,54 @@ public interface IMessagePumpCircuitBreaker
Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions);
}

/// <summary>
/// Represents an instance to notify changes in circuit breaker states for given message pumps.
/// </summary>
public interface ICircuitBreakerEventHandler
{
/// <summary>
/// Notifies the application on a change in the message pump's circuit breaker state.
/// </summary>
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
Task OnTransitionAsync(MessagePumpCircuitState newState);
}

/// <summary>
/// Represents a registration of an <see cref="ICircuitBreakerEventHandler"/> instance in the application services,
/// specifically linked to a message pump.
/// </summary>
internal sealed class CircuitBreakerEventHandler
{
/// <summary>
/// Initializes a new instance of the <see cref="CircuitBreakerEventHandler" /> class.
/// </summary>
public CircuitBreakerEventHandler(string jobId, ICircuitBreakerEventHandler handler)
{
if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID for the circuit breaker event handler registration", nameof(jobId));
}

if (handler is null)
{
throw new ArgumentNullException(nameof(handler));
}

JobId = jobId;
Handler = handler;
}

/// <summary>
/// Gets the unique ID to distinguish the linked message pump.
/// </summary>
public string JobId { get; }

/// <summary>
/// Gets the event handler implementation to trigger on transition changes in the linked message pump.
/// </summary>
public ICircuitBreakerEventHandler Handler { get; }
}

/// <summary>
/// Represents the available states in the <see cref="MessagePumpCircuitState"/> in which the message pump can transition into.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public override async Task StartProcessingMessagesAsync(CancellationToken cancel

} while (!singleProcessingResult.IsSuccessful);

NotifyResumeRetrievingMessages();
await NotifyResumeRetrievingMessagesAsync();
}
}
catch (Exception exception) when (exception is TaskCanceledException or OperationCanceledException or ObjectDisposedException)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Extensions on the <see cref="IServiceCollection"/> to add an <see cref="IAzureServiceBusMessageHandler{TMessage}"/>'s implementations.
/// </summary>
// ReSharper disable once InconsistentNaming
public static class ServiceBusMessageHandlerCollectionExtensions
{
/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="collection">The application services to register the event handler.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="collection"/> is <c>null</c>.</exception>
public static ServiceBusMessageHandlerCollection WithCircuitBreakerEventHandler<TEventHandler>(
this ServiceBusMessageHandlerCollection collection)
where TEventHandler : ICircuitBreakerEventHandler
{
return WithCircuitBreakerEventHandler(collection, provider => ActivatorUtilities.CreateInstance<TEventHandler>(provider));
}

/// <summary>
/// Adds an <see cref="ICircuitBreakerEventHandler"/> implementation for a specific message pump to the application services.
/// </summary>
/// <typeparam name="TEventHandler">The custom type of the event handler.</typeparam>
/// <param name="collection">The application services to register the event handler.</param>
/// <param name="implementationFactory">The factory function to create the custom <see cref="ICircuitBreakerEventHandler"/> implementation.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="collection"/> or <paramref name="implementationFactory"/> is <c>null</c>.</exception>
public static ServiceBusMessageHandlerCollection WithCircuitBreakerEventHandler<TEventHandler>(
this ServiceBusMessageHandlerCollection collection,
Func<IServiceProvider, TEventHandler> implementationFactory)
where TEventHandler : ICircuitBreakerEventHandler
{
if (collection is null)
{
throw new ArgumentNullException(nameof(collection));
}

if (implementationFactory is null)
{
throw new ArgumentNullException(nameof(implementationFactory));
}

collection.Services.AddCircuitBreakerEventHandler(collection.JobId, implementationFactory);
return collection;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using Xunit;

namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
{
/// <summary>
/// Represents an <see cref="ICircuitBreakerEventHandler"/> implementation that verifies if the circuit breaker transitions states are correct.
/// </summary>
internal class MockCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
private readonly Collection<MessagePumpCircuitState> _states = new();

/// <summary>
/// Notifies the application on a change in the message pump's circuit breaker state.
/// </summary>
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
public Task OnTransitionAsync(MessagePumpCircuitState newState)
{
_states.Add(newState);

return Task.CompletedTask;
}

public void ShouldTransitionCorrectly()
{
Assert.NotEmpty(_states);

MessagePumpCircuitState firstTransition = _states[0];
Assert.True(firstTransition.IsOpen, $"when the message pump starts up, the first transition should always be from a closed to open state, but got: {firstTransition}");

IEnumerable<(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)> transitions =
_states.SkipLast(1).Zip(_states.Skip(1));

Assert.All(transitions, t => VerifyCorrectTransition(t.oldState, t.newState));
}

private static void VerifyCorrectTransition(
MessagePumpCircuitState oldState,
MessagePumpCircuitState newState)
{
if (oldState.IsClosed)
{
Assert.True(newState.IsHalfOpen, $"when the message pump comes from a closed state, the next state should always be half-open, but got: {newState}");
}
else if (oldState.IsOpen)
{
Assert.True(newState.IsHalfOpen, $"when the message pump comes from an open state, the next state should always be half-open, but got: {newState}");
}
else if (oldState.IsHalfOpen)
{
Assert.True(newState.IsClosed, $"when the message pump comes from a half-open state, the next state should always be closed, but got: {newState}");
}
}
}
}
Loading
Loading