Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(circuit-breaker): add job ID + old/new state change to circuit-breaker transition #469

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ The instances should implement the `ICircuitBreakerEventHandler`, which allows y
```csharp
public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
public Task OnTransitionAsync(MessagePumpCircuitState newState)
public Task OnTransitionAsync(MessagePumpCircuitStateChang change)
{
// ...
// The job ID of the message pump that was transitioned.
string jobId = change.JobId;

// The circuit breaker state transitions.
MessagePumpCircuitState oldState = change.OldState;
MessagePumpCircuitState newState = change.NewState;

// Process the state change event...
}
}
```
Expand Down
32 changes: 22 additions & 10 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellati

if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
MessagePumpCircuitState
oldState = CircuitState,
newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
CircuitState = newState;
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
}
}

Expand All @@ -162,9 +165,12 @@ protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken ca

if (!CircuitState.IsHalfOpen)
{
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
MessagePumpCircuitState
oldState = CircuitState,
newState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);

NotifyCircuitBreakerStateChangedSubscribers();
CircuitState = newState;
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
}
}

Expand All @@ -176,9 +182,12 @@ internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions option
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);
MessagePumpCircuitState
oldState = CircuitState,
newState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);

NotifyCircuitBreakerStateChangedSubscribers();
CircuitState = newState;
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
}

/// <summary>
Expand All @@ -188,18 +197,21 @@ protected void NotifyResumeRetrievingMessages()
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Closed;
MessagePumpCircuitState
oldState = CircuitState,
newState = MessagePumpCircuitState.Closed;

NotifyCircuitBreakerStateChangedSubscribers();
CircuitState = newState;
NotifyCircuitBreakerStateChangedSubscribers(oldState, newState);
}

private void NotifyCircuitBreakerStateChangedSubscribers()
private void NotifyCircuitBreakerStateChangedSubscribers(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)
{
ICircuitBreakerEventHandler[] eventHandlers = GetEventHandlersForPump();

foreach (var handler in eventHandlers)
{
Task.Run(() => handler.OnTransition(CircuitState));
Task.Run(() => handler.OnTransition(new MessagePumpCircuitStateChangedEventArgs(JobId, oldState, newState)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,47 @@ public interface ICircuitBreakerEventHandler
/// <summary>
/// Notifies the application on a change in the message pump's circuit breaker state.
/// </summary>
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
void OnTransition(MessagePumpCircuitState newState);
/// <param name="args">The change in the circuit breaker state for a message pump.</param>
void OnTransition(MessagePumpCircuitStateChangedEventArgs args);
}

/// <summary>
/// Represents a change event of the <see cref="MessagePumpCircuitState"/> in a <see cref="MessagePump"/>.
/// </summary>
public class MessagePumpCircuitStateChangedEventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="MessagePumpCircuitStateChangedEventArgs" /> class.
/// </summary>
internal MessagePumpCircuitStateChangedEventArgs(
string jobId,
MessagePumpCircuitState oldState,
MessagePumpCircuitState newState)
{
if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID for the circuit breaker event state change registration", nameof(jobId));
}

JobId = jobId;
OldState = oldState;
NewState = newState;
}

/// <summary>
/// Gets the unique ID to distinguish the linked message pump that had it circuit breaker state changed.
/// </summary>
public string JobId { get; }

/// <summary>
/// Gets the original circuit breaker state the linked message pump was in.
/// </summary>
public MessagePumpCircuitState OldState { get; }

/// <summary>
/// Gets the current circuit breaker state the linked message pump is in.
/// </summary>
public MessagePumpCircuitState NewState { get; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Collections.Generic;
using System;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using Arcus.Testing;
using Xunit;
using Xunit.Sdk;

namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
{
Expand All @@ -11,28 +13,32 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
/// </summary>
internal class MockCircuitBreakerEventHandler : ICircuitBreakerEventHandler
{
private readonly Collection<MessagePumpCircuitState> _states = new();
private readonly Collection<MessagePumpCircuitStateChangedEventArgs> _states = new();

/// <summary>
/// Notifies the application on a change in the message pump's circuit breaker state.
/// </summary>
/// <param name="newState">The new circuit breaker state in which the message pump is currently running on.</param>
public void OnTransition(MessagePumpCircuitState newState)
/// <param name="args">The change in the circuit breaker state for a message pump.</param>
public void OnTransition(MessagePumpCircuitStateChangedEventArgs args)
{
_states.Add(newState);
_states.Add(args);
}

public void ShouldTransitionCorrectly()
/// <summary>
/// Verifies that all fired circuit breaker state transitions are happening correctly.
/// </summary>
public async Task ShouldTransitionedCorrectlyAsync()
{
Assert.NotEmpty(_states);

MessagePumpCircuitState firstTransition = _states[0];
Assert.True(firstTransition.IsOpen, $"when the message pump starts up, the first transition should always be from a closed to open state, but got: {firstTransition}");
await Poll.Target<XunitException>(() =>
{
Assert.NotEmpty(_states);
Assert.Equal(3, _states.Count);

IEnumerable<(MessagePumpCircuitState oldState, MessagePumpCircuitState newState)> transitions =
_states.SkipLast(1).Zip(_states.Skip(1));
}).Every(TimeSpan.FromSeconds(1))
.Timeout(TimeSpan.FromSeconds(10))
.FailWith("could not in time find all the fired circuit breaker change events, possibly the message pump did not fired them");

Assert.All(transitions, t => VerifyCorrectTransition(t.oldState, t.newState));
Assert.All(_states, t => VerifyCorrectTransition(t.OldState, t.NewState));
}

private static void VerifyCorrectTransition(
Expand All @@ -41,7 +47,7 @@ private static void VerifyCorrectTransition(
{
if (oldState.IsClosed)
{
Assert.True(newState.IsHalfOpen, $"when the message pump comes from a closed state, the next state should always be half-open, but got: {newState}");
Assert.True(newState.IsOpen, $"when the message pump comes from a closed state, the next state should always be open, but got: {newState}");
}
else if (oldState.IsOpen)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@
await producer.ProduceAsync(messageAfterBreak);
await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId);

mockEventHandler1.ShouldTransitionCorrectly();
mockEventHandler2.ShouldTransitionCorrectly();
await mockEventHandler1.ShouldTransitionedCorrectlyAsync();
await mockEventHandler2.ShouldTransitionedCorrectlyAsync();
}

[Fact]
Expand Down Expand Up @@ -99,8 +99,8 @@
await producer.ProduceAsync(messageAfterBreak);
await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId);

mockEventHandler1.ShouldTransitionCorrectly();
mockEventHandler2.ShouldTransitionCorrectly();
await mockEventHandler1.ShouldTransitionedCorrectlyAsync();
await mockEventHandler2.ShouldTransitionedCorrectlyAsync();
}

private async Task<TemporaryTopicSubscription> CreateTopicSubscriptionForMessageAsync(params ServiceBusMessage[] messages)
Expand Down Expand Up @@ -230,7 +230,7 @@
public class TemporaryTopicSubscription : IAsyncDisposable
{
private readonly ServiceBusAdministrationClient _client;
private readonly string _serviceBusNamespace;

Check warning on line 233 in src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Field 'TemporaryTopicSubscription._serviceBusNamespace' is never assigned to, and will always have its default value null
private readonly CreateSubscriptionOptions _options;
private readonly bool _createdByUs;
private readonly ILogger _logger;
Expand Down
Loading