diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs index c586f468..0b4ccb19 100644 --- a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs +++ b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs @@ -415,32 +415,21 @@ private async Task DeleteTopicSubscriptionAsync(CancellationToken cancellationTo try { - bool subscriptionExists = - await serviceBusClient.SubscriptionExistsAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken); - + bool subscriptionExists = await serviceBusClient.SubscriptionExistsAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken); if (subscriptionExists) { - Logger.LogTrace( - "Deleting subscription '{SubscriptionName}' on topic '{Path}'...", - SubscriptionName, serviceBusConnectionString.EntityPath); - + Logger.LogTrace("Deleting subscription '{SubscriptionName}' on topic '{Path}'...", SubscriptionName, serviceBusConnectionString.EntityPath); await serviceBusClient.DeleteSubscriptionAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken); - - Logger.LogTrace( - "Subscription '{SubscriptionName}' deleted on topic '{Path}'", - SubscriptionName, serviceBusConnectionString.EntityPath); + Logger.LogTrace("Subscription '{SubscriptionName}' deleted on topic '{Path}'", SubscriptionName, serviceBusConnectionString.EntityPath); } else { - Logger.LogTrace( - "Cannot delete topic subscription with name '{SubscriptionName}' because no subscription exists on Service Bus resource", - SubscriptionName); } + Logger.LogTrace("Cannot delete topic subscription with name '{SubscriptionName}' because no subscription exists on Service Bus resource", SubscriptionName); + } } catch (Exception exception) { - Logger.LogWarning(exception, - "Failed to delete topic subscription with name '{SubscriptionName}' on Service Bus resource", - SubscriptionName); + Logger.LogWarning(exception, "Failed to delete topic subscription with name '{SubscriptionName}' on Service Bus resource", SubscriptionName); } finally { @@ -479,34 +468,24 @@ private async Task HandleMessageAsync(Message message, CancellationToken cancell Logger.LogInformation("No operation ID was found on the message"); } - try + MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + using (IServiceScope serviceScope = ServiceProvider.CreateScope()) { - MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); - using (IServiceScope serviceScope = ServiceProvider.CreateScope()) + var correlationInfoAccessor = serviceScope.ServiceProvider.GetService>(); + if (correlationInfoAccessor is null) { - var correlationInfoAccessor = serviceScope.ServiceProvider.GetService>(); - if (correlationInfoAccessor is null) - { - Logger.LogTrace("No message correlation configured"); - await ProcessMessageAsync(message, cancellationToken, correlationInfo); - } - else + Logger.LogTrace("No message correlation configured"); + await ProcessMessageWithFallbackAsync(message, cancellationToken, correlationInfo); + } + else + { + correlationInfoAccessor.SetCorrelationInfo(correlationInfo); + using (LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfoAccessor))) { - correlationInfoAccessor.SetCorrelationInfo(correlationInfo); - using (LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfoAccessor))) - { - await ProcessMessageAsync(message, cancellationToken, correlationInfo); - } + await ProcessMessageWithFallbackAsync(message, cancellationToken, correlationInfo); } } } - catch (Exception ex) - { - Logger.LogCritical(ex, "Unable to process message with ID '{MessageId}'", message.MessageId); - await HandleReceiveExceptionAsync(ex); - - throw; - } } /// @@ -535,41 +514,58 @@ protected override Task PreProcessMessageAsync(MessageHandler m return Task.CompletedTask; } - private async Task ProcessMessageAsync( - Message message, - CancellationToken cancellationToken, - MessageCorrelationInfo correlationInfo) + private async Task ProcessMessageWithFallbackAsync(Message message, CancellationToken cancellationToken, MessageCorrelationInfo correlationInfo) { - Logger.LogTrace("Received message '{MessageId}'", message.MessageId); + try + { + Logger.LogTrace("Received message '{MessageId}'", message.MessageId); - var messageContext = new AzureServiceBusMessageContext(message.MessageId, message.SystemProperties, message.UserProperties); - Encoding encoding = messageContext.GetMessageEncodingProperty(Logger); - string messageBody = encoding.GetString(message.Body); + var messageContext = new AzureServiceBusMessageContext(message.MessageId, message.SystemProperties, message.UserProperties); + Encoding encoding = messageContext.GetMessageEncodingProperty(Logger); + string messageBody = encoding.GetString(message.Body); - if (_fallbackMessageHandler is null) - { - await ProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken); - } - else - { - if (_fallbackMessageHandler is AzureServiceBusMessageHandlerTemplate specificMessageHandler) + if (_fallbackMessageHandler is null) { - specificMessageHandler.SetMessageReceiver(_messageReceiver); + await ProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken); } - - MessageHandlerResult result = await ProcessMessageAndCaptureAsync(messageBody, messageContext, correlationInfo, cancellationToken); - if (result.Exception != null) + else { - throw result.Exception; + await FallbackProcessMessageAsync(message, messageBody, messageContext, correlationInfo, cancellationToken); } - if (!result.IsProcessed) - { - await _fallbackMessageHandler.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); - } + Logger.LogTrace("Message '{MessageId}' processed", message.MessageId); } + catch (Exception exception) + { + Logger.LogCritical(exception, "Unable to process message with ID '{MessageId}'", message.MessageId); + await HandleReceiveExceptionAsync(exception); - Logger.LogTrace("Message '{MessageId}' processed", message.MessageId); + throw; + } + } + + private async Task FallbackProcessMessageAsync( + Message message, + string messageBody, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + if (_fallbackMessageHandler is AzureServiceBusMessageHandlerTemplate specificMessageHandler) + { + specificMessageHandler.SetMessageReceiver(_messageReceiver); + } + + MessageHandlerResult result = await ProcessMessageAndCaptureAsync(messageBody, messageContext, correlationInfo, cancellationToken); + if (result.Exception != null) + { + throw result.Exception; + } + + if (!result.IsProcessed) + { + await _fallbackMessageHandler.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); + } } private static async Task UntilCancelledAsync(CancellationToken cancellationToken) diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj index 142a4c1a..fd9216a7 100644 --- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj +++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj @@ -18,8 +18,11 @@ + + + @@ -29,6 +32,9 @@ + + + diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/ApplicationInsightsConfig.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/ApplicationInsightsConfig.cs new file mode 100644 index 00000000..89b40e0d --- /dev/null +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/ApplicationInsightsConfig.cs @@ -0,0 +1,43 @@ +using GuardNet; + +namespace Arcus.Messaging.Tests.Integration.Fixture +{ + /// + /// Represents an application configuration section related to information regarding Azure Application Insights. + /// + public class ApplicationInsightsConfig + { + /// + /// Initializes a new instance of the class. + /// + /// The instrumentation key of the Azure Application Insights resource. + /// The application ID that has API access to the Azure Application Insights resource. + /// The application API key that has API access to the Azure Application Insights resource. + /// Thrown when the or or is blank. + public ApplicationInsightsConfig(string instrumentationKey, string applicationId, string apiKey) + { + Guard.NotNullOrWhitespace(instrumentationKey, nameof(instrumentationKey), "Requires a non-blank Application Insights instrumentation key"); + Guard.NotNullOrWhitespace(apiKey, nameof(apiKey), "Requires a non-blank Application Insights application application ID"); + Guard.NotNullOrWhitespace(apiKey, nameof(apiKey), "Requires a non-blank Application Insights application API key"); + + InstrumentationKey = instrumentationKey; + ApplicationId = applicationId; + ApiKey = apiKey; + } + + /// + /// Gets the instrumentation key to connect to the Application Insights resource. + /// + public string InstrumentationKey { get; } + + /// + /// Gets the application ID which has API access to the Application Insights resource. + /// + public string ApplicationId { get; } + + /// + /// Gets the application API key which has API access to the Application Insights resource. + /// + public string ApiKey { get; } + } +} diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/ServiceBusQueueTrackCorrelationOnExceptionProgram.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/ServiceBusQueueTrackCorrelationOnExceptionProgram.cs new file mode 100644 index 00000000..10b0c58f --- /dev/null +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/ServiceBusQueueTrackCorrelationOnExceptionProgram.cs @@ -0,0 +1,74 @@ +using System; +using Arcus.EventGrid.Publishing; +using Arcus.Messaging.Tests.Core.Messages.v1; +using Arcus.Messaging.Tests.Workers.MessageHandlers; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Serilog; +using Serilog.Configuration; +using Serilog.Events; + +// ReSharper disable once CheckNamespace +namespace Arcus.Messaging.Tests.Workers.ServiceBus +{ + public class ServiceBusQueueTrackCorrelationOnExceptionProgram + { + public static void main(string[] args) + { + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .WriteTo.Console() + .CreateLogger(); + + try + { + CreateHostBuilder(args) + .Build() + .Run(); + } + catch (Exception exception) + { + Log.Fatal(exception, "Host terminated unexpectedly"); + } + finally + { + Log.CloseAndFlush(); + } + } + + public static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureAppConfiguration(configuration => + { + configuration.AddCommandLine(args); + configuration.AddEnvironmentVariables(); + }) + .UseSerilog(UpdateLoggerConfiguration) + .ConfigureServices((hostContext, services) => + { + services.AddServiceBusQueueMessagePump(configuration => configuration["ARCUS_SERVICEBUS_CONNECTIONSTRING"]) + .WithServiceBusMessageHandler(); + + services.AddTcpHealthProbes("ARCUS_HEALTH_PORT", builder => builder.AddCheck("sample", () => HealthCheckResult.Healthy())); + }); + + private static void UpdateLoggerConfiguration( + HostBuilderContext hostContext, + LoggerConfiguration currentLoggerConfiguration) + { + var instrumentationKey = hostContext.Configuration.GetValue("APPLICATIONINSIGHTS_INSTRUMENTATIONKEY"); + + currentLoggerConfiguration + .MinimumLevel.Debug() + .MinimumLevel.Override("Microsoft", LogEventLevel.Information) + .Enrich.FromLogContext() + .Enrich.WithVersion() + .Enrich.WithComponentName("Service Bus Queue Worker") + .WriteTo.Console() + .WriteTo.AzureApplicationInsights(instrumentationKey); + } + } +} \ No newline at end of file diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs index 3f3ab666..129cdceb 100644 --- a/src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs @@ -78,6 +78,19 @@ public string GetServiceBusConnectionString(ServiceBusEntity entity) } } + /// + /// Gets the Application Insights configuration from the application configuration. + /// + /// Thrown when one of the Application Insights configuration values is blank. + public ApplicationInsightsConfig GetApplicationInsightsConfig() + { + var instrumentationKey = _config.GetValue("Arcus:ApplicationInsights:InstrumentationKey"); + var applicationId = _config.GetValue("Arcus:ApplicationInsights:ApplicationId"); + var apiKey = _config.GetValue("Arcus:ApplicationInsights:ApiKey"); + + return new ApplicationInsightsConfig(instrumentationKey, applicationId, apiKey); + } + /// /// Gets the project directory where the fixtures are located. /// diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs index e2c544f0..04fe28e4 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs @@ -1,19 +1,25 @@ using System; using System.Threading.Tasks; +using Arcus.Messaging.Abstractions; using Arcus.Messaging.Pumps.ServiceBus; using Arcus.Messaging.Tests.Core.Generators; using Arcus.Messaging.Tests.Core.Messages.v1; using Arcus.Messaging.Tests.Integration.Fixture; using Arcus.Messaging.Tests.Integration.ServiceBus; using Arcus.Messaging.Tests.Workers.ServiceBus; +using Arcus.Observability.Telemetry.Core; using Arcus.Security.Providers.AzureKeyVault.Authentication; using Arcus.Testing.Logging; +using Microsoft.Azure.ApplicationInsights.Query; +using Microsoft.Azure.ApplicationInsights.Query.Models; using Microsoft.Azure.KeyVault; using Microsoft.Azure.Management.ServiceBus.Models; using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Logging; +using Polly; using Xunit; using Xunit.Abstractions; +using RetryPolicy = Polly.Retry.RetryPolicy; namespace Arcus.Messaging.Tests.Integration.MessagePump { @@ -185,6 +191,48 @@ public async Task ServiceBusMessagePumpWithServiceBusAbandon_PublishServiceBusMe } } + [Fact] + public async Task ServiceBusMessagePump_FailureDuringMessageHandling_TracksCorrelationInApplicationInsights() + { + // Arrange + string operationId = $"operation-{Guid.NewGuid()}", transactionId = $"transaction-{Guid.NewGuid()}"; + + var config = TestConfig.Create(); + ApplicationInsightsConfig applicationInsightsConfig = config.GetApplicationInsightsConfig(); + string connectionString = config.GetServiceBusConnectionString(ServiceBusEntity.Queue); + var commandArguments = new[] + { + CommandArgument.CreateSecret("APPLICATIONINSIGHTS_INSTRUMENTATIONKEY", applicationInsightsConfig.InstrumentationKey), + CommandArgument.CreateSecret("ARCUS_SERVICEBUS_CONNECTIONSTRING", connectionString) + }; + + using var project = await ServiceBusWorkerProject.StartNewWithAsync(config, _logger, commandArguments); + await using var service = await TestMessagePumpService.StartNewAsync(config, _logger); + Message orderMessage = OrderGenerator.Generate().AsServiceBusMessage(operationId, transactionId); + + // Act + await service.SendMessageToServiceBusAsync(connectionString, orderMessage); + + // Assert + using (ApplicationInsightsDataClient client = CreateApplicationInsightsClient(applicationInsightsConfig.ApiKey)) + { + await RetryAssertUntilTelemetryShouldBeAvailableAsync(async () => + { + const string onlyLastHourFilter = "timestamp gt now() sub duration'PT1H'"; + EventsResults results = + await client.Events.GetExceptionEventsAsync(applicationInsightsConfig.ApplicationId, filter: onlyLastHourFilter); + + Assert.Contains(results.Value, result => + { + result.CustomDimensions.TryGetValue(ContextProperties.Correlation.TransactionId, out string actualTransactionId); + result.CustomDimensions.TryGetValue(ContextProperties.Correlation.OperationId, out string actualOperationId); + + return transactionId == actualTransactionId && operationId == actualOperationId && operationId == result.Operation.Id; + }); + }, timeout: TimeSpan.FromMinutes(5)); + } + } + [Fact] public async Task ServiceBusMessagePump_RotateServiceBusConnectionKeys_MessagePumpRestartsThenMessageSuccessfullyProcessed() { @@ -233,5 +281,28 @@ await keyVaultClient.SetSecretAsync( secretName: keyRotationConfig.KeyVaultSecret.SecretName, value: rotatedConnectionString); } + + private static ApplicationInsightsDataClient CreateApplicationInsightsClient(string instrumentationKey) + { + var clientCredentials = new ApiKeyClientCredentials(instrumentationKey); + var client = new ApplicationInsightsDataClient(clientCredentials); + + return client; + } + + private async Task RetryAssertUntilTelemetryShouldBeAvailableAsync(Func assertion, TimeSpan timeout) + { + RetryPolicy retryPolicy = + Policy.Handle(exception => + { + _logger.LogError(exception, "Failed to contact Azure Application Insights. Reason: {Message}", exception.Message); + return true; + }) + .WaitAndRetryForeverAsync(index => TimeSpan.FromSeconds(1)); + + await Policy.TimeoutAsync(timeout) + .WrapAsync(retryPolicy) + .ExecuteAsync(assertion); + } } } diff --git a/src/Arcus.Messaging.Tests.Integration/appsettings.json b/src/Arcus.Messaging.Tests.Integration/appsettings.json index b09f0f10..99686b9a 100644 --- a/src/Arcus.Messaging.Tests.Integration/appsettings.json +++ b/src/Arcus.Messaging.Tests.Integration/appsettings.json @@ -6,7 +6,6 @@ "Infra": { "ServiceBus": { "TopicName": "#{Arcus.TestInfra.ServiceBus.Topic.Name}#", - "ConnectionString": "#{Arcus.TestInfra.ServiceBus.Topic.ConnectionString}#" }, "EventGrid": { @@ -22,7 +21,12 @@ "SelfContained": { "ConnectionStringWithQueue": "#{Arcus.ServiceBus.ConnectionStringWithQueue}#", "ConnectionStringWithTopic": "#{Arcus.ServiceBus.ConnectionStringWithTopic}#" - } + } + }, + "ApplicationInsights": { + "InstrumentationKey": "#{Arcus.ApplicationInsights.InstrumentationKey}#", + "ApplicationId": "#{Arcus.ApplicationInsights.ApplicationId}#", + "ApiKey": "#{Arcus.ApplicationInsights.ApiKey}#" }, "KeyRotation": { "ServicePrincipal": { diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj index 98638e42..2a4d700f 100644 --- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj @@ -14,12 +14,17 @@ + + + + + diff --git a/src/Arcus.Messaging.Tests.Workers/MessageHandlers/OrdersSabotageAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers/MessageHandlers/OrdersSabotageAzureServiceBusMessageHandler.cs new file mode 100644 index 00000000..0eb86ed9 --- /dev/null +++ b/src/Arcus.Messaging.Tests.Workers/MessageHandlers/OrdersSabotageAzureServiceBusMessageHandler.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.Tests.Core.Messages.v1; + +namespace Arcus.Messaging.Tests.Workers.MessageHandlers +{ + public class OrdersSabotageAzureServiceBusMessageHandler : IAzureServiceBusMessageHandler + { + /// + /// Process a new message that was received + /// + /// Message that was received + /// Context providing more information concerning the processing + /// + /// Information concerning correlation of telemetry and processes by using a variety of unique + /// identifiers + /// + /// Cancellation token + public Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + throw new InvalidOperationException( + "Sabotage the message processing with an unhandled exception"); + } + } +}