From 7d294284126e67e6a28519fc23dc5fdef97c89b4 Mon Sep 17 00:00:00 2001 From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com> Date: Thu, 13 Feb 2025 06:46:54 +0100 Subject: [PATCH 1/3] chore(remove): event-hubs pump project --- .../02-message-handling/03-event-hubs.md | 345 ---------- .../02-Features/06-general-messaging.md | 3 +- .../Arcus.Messaging.Pumps.EventHubs.csproj | 41 -- .../AzureEventHubsMessagePump.cs | 231 ------- .../AzureEventHubsMessagePumpConfig.cs | 186 ----- .../AzureEventHubsMessagePumpOptions.cs | 48 -- .../IServiceCollectionExtensions.cs | 355 ---------- .../Arcus.Messaging.Tests.Integration.csproj | 6 +- .../Fixture/EventHubsEntityFixture.cs | 26 + .../EventHubs/TestEventHubsMessageProducer.cs | 47 -- .../EventHubsMessagePump.ConnectivityTests.cs | 98 --- .../EventHubsMessagePump.RouterTests.cs | 128 ---- .../EventHubsMessagePump.TelemetryTests.cs | 181 ----- .../MessagePump/EventHubsMessagePumpTests.cs | 254 ------- .../Fixture/EventDataExtensions.cs | 19 - .../Arcus.Messaging.Tests.Unit.csproj | 1 - .../AzureEventHubsMessagePumpOptionsTests.cs | 24 - .../IServiceCollectionExtensionTests.cs | 638 ------------------ ...saging.Tests.Workers.EventHubs.Core.csproj | 1 - .../OrderEventHubsMessageHandler.cs | 65 -- src/Arcus.Messaging.sln | 7 - 21 files changed, 32 insertions(+), 2672 deletions(-) delete mode 100644 docs/preview/02-Features/02-message-handling/03-event-hubs.md delete mode 100644 src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj delete mode 100644 src/Arcus.Messaging.Pumps.EventHubs/AzureEventHubsMessagePump.cs delete mode 100644 src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpConfig.cs delete mode 100644 src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpOptions.cs delete mode 100644 src/Arcus.Messaging.Pumps.EventHubs/Extensions/IServiceCollectionExtensions.cs create mode 100644 src/Arcus.Messaging.Tests.Integration/Fixture/EventHubsEntityFixture.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubs/TestEventHubsMessageProducer.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.RouterTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/EventDataExtensions.cs delete mode 100644 src/Arcus.Messaging.Tests.Unit/EventHubs/AzureEventHubsMessagePumpOptionsTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Unit/EventHubs/IServiceCollectionExtensionTests.cs delete mode 100644 src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/OrderEventHubsMessageHandler.cs diff --git a/docs/preview/02-Features/02-message-handling/03-event-hubs.md b/docs/preview/02-Features/02-message-handling/03-event-hubs.md deleted file mode 100644 index a5b6cbc2..00000000 --- a/docs/preview/02-Features/02-message-handling/03-event-hubs.md +++ /dev/null @@ -1,345 +0,0 @@ ---- -title: "Azure Event Hubs message handling" -layout: default ---- - -# Azure Event Hubs message handling -The `Arcus.Messaging.Pumps.EventHubs` library provides ways to perform all the plumbing that is required for processing event messages on Azure EventHubs. - -As a user, the only thing you have to do is **focus on processing messages, not how to get them**. Following terms are used: -- **Message handler**: implementation that processes the received message from an Azure EventHubs. Message handlers are created by implementing the `IAzureEventHubsMessageHandler`. This message handler will be called upon when a message is available in the Azure EventHubs. [this section](#message-handler-example) for a message handler example setup -- **Message router**: implementation that delegates the received Azure EventHubs event message to the correct message handler. -- **Message pump**: implementation that interacts and receives the Azure EventHubs event message. The pump can be configured for different scenarios, see [this section](#pump-configuration) for more information. - -![Message handling schema](/media/worker-eventhubs-message-handling.png) - -## Installation -This features requires to install our NuGet package: - -```shell -PM > Install-Package Arcus.Messaging.Pumps.EventHubs -``` - -## Message handler example -Here is an example of a message handler that expects messages of type `SensorReading`: - -> ⚡ You can use the same message handlers in an a .NET Worker message pump and an Azure Functions scenario. - -```csharp -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Pumps.EventHubs; -using Microsoft.Extensions.Logging; - -public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler -{ - private readonly ILogger _logger; - - public SensorReadingMessageHandler(ILogger logger) - { - _logger = logger; - } - - public async Task ProcessMessageAsync( - SensorReading message, - AzureEventHubsMessageContext messageContext, - MessageCorrelationInfo correlationInfo, - CancellationToken cancellationToken) - { - _logger.LogInformation("Processing sensor reading {SensorId} for ", message.Id); - - // Process the message. - - _logger.LogInformation("Sensor reading {SensorId} processed", message.Id); - } -} -``` - -## Message handler registration -Once the message handler is created, you can very easily register it: - -```csharp -using Microsoft.Extensions.DependencyInjection; - -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - // Add Azure EventHubs message pump and use OrdersMessageHandler to process the messages. - services.AddEventHubsMessagePumpUsingManagedIdentity("", "", "") - .WithEventHubsMessageHandler(); - - // Note, that only a single call to the `.WithEventHubsMessageHandler` has to be made when the handler should be used across message pumps. - } -} -``` - -The Azure EventHubs uses the `EventProcessorClient` internally. To learn more about this way of consuming messages from Azure EventHubs, see [Microsoft's official documentation](https://docs.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventhubs.processor-readme). - -In this example, we are using the Azure EventHubs message pump to process event messages and use the connection strings stored in the Arcus secret store: -- Azure EventHubs name: The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. -- Azure EventHubs connection string secret name: The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store implementation. -- Azure EventHubs Blob storage container name: The name of the Azure Blob storage container in the storage account to reference where the event checkpoints will be stored. The events will be streamed to this storage so that the client only has to worry about event processing, not event capturing. -- Azure EventHubs Blob storage account connection string secret name: The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store implementation. - -> ⚠ The order in which the message handlers are registered matters when a message is processed. If the first one can't handle the message, the second will be checked, and so forth. - -### Filter messages based on message context -When registering a new message handler, one can opt-in to add a filter on the message context which filters out messages that are not needed to be processed. - -This can be useful when you are sending different message types on the same queue. Another use-case is being able to handle different versions of the same message type which have different contracts because you are migrating your application. - -Following example shows how a message handler should only process a certain message when a property's in the context is present. - -We'll use a simple message handler implementation: - -```csharp -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Pumps.Abstractions.MessagingHandling; - -public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler -{ - public async Task ProcessMessageAsync(SensorReading message, AzureEventHubsMessageContext context, ...) - { - // Do some processing... - } -} -``` - -We would like that this handler only processed the message when the context contains `Location` equals `Room`. - -```csharp -using Microsoft.Extensions.DependencyInjection; - -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - services.AddEventHubsMessagePumpUsingManagedIdentity(...) - .WithEventHubsMessageHandler(context => context.Properties["Location"].ToString() == "Room"); - } -} -``` - -> Note that the order in which the message handlers are registered is important in the message processing. -> In the example, when a message handler above this one is registered that could also handle the message (same message type) then that handler may be chosen instead of the one with the specific filter. - -### Bring your own deserialization -You can also choose to extend the built-in message deserialization with a custom deserializer to meet your needs. -This allows you to easily deserialize into different message formats or reuse existing (de)serialization capabilities that you already have without altering the message router. - -You start by implementing an `IMessageBodySerializer`. The following example shows how an expected type can be transformed to something else. -The result type (in this case `SensorReadingBatch`) will then be used to check if there is an `IAzureEventHubsMessageHandler` registered for that message type. - -```csharp -using Arcus.Messaging.Pumps.Abstractions.MessageHandling; - -public class SensorReadingBatchMessageBodySerializer : IMessageBodySerializer -{ - public async Task DeserializeMessageAsync(string messageBody) - { - var serializer = new XmlSerializer(typeof(SensorReading[])); - using (var contents = new MemoryStream(Encoding.UTF8.GetBytes(messageBody))) - { - var messages = (SensorReading[]) serializer.Deserialize(contents); - return MessageResult.Success(new SensorReadingBatch(messages)); - } - } -} -``` - -The registration of these message body serializers can be done just as easily as an `IAzureEventHubsMessageHandler`: - -```csharp -using Microsoft.Extensions.DependencyInjection; - -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - // Register the message body serializer in the dependency container where the dependent services will be injected. - services.AddEventHubsMessagePumpUsingManagedIdentity(...) - .WithEventHubsMessageHandler(..., messageBodySerializer: new OrderBatchMessageBodySerializer()); - - // Register the message body serializer in the dependency container where the dependent services are manually injected. - services.AddEventHubsMessagePumpUsingManagedIdentity(...) - .WithEventHubsMessageHandler(..., messageBodySerializerImplementationFactory: serviceProvider => - { - var logger = serviceProvider.GetService>(); - return new SensorReadingBatchMessageHandler(logger); - }); - } -} -``` - -> Note that the order in which the message handlers are registered is important in the message processing. -> In the example, when a message handler above this one is registered that could also handle the message (same message type) then that handler may be chosen instead of the one with the specific filter. - -### Filter messages based on message body -When registering a new message handler, one can opt-in to add a filter on the incoming message body which filters out messages that are not needed to be processed by this message handler. -This can be useful when you want to route messages based on the message content itself instead of the messaging context. - -Following example shows how a message handler should only process a certain message when the status is 'Active'; meaning only `SensorReading` with active sensors will be processed. - -```csharp -// Message to be sent: -public enum SensorStatus { Active, Idle } - -public class SensorReading -{ - public string SensorId { get; set; } - public SensorStatus Status { get; set; } -} - -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Pumps.Abstractions.MessageHandling; - -// Message handler -public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler -{ - public async Task ProcessMessageAsync(SensorReading message, AzureEventHubsMessageContext context, ...) - { - // Do some processing... - } -} - -using Microsoft.Extensions.DependencyInjection; - -// Message handler registration -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - services.AddEventHubsMessagePumpUsingManagedIdentity(...) - .WithEventHubsMessageHandler((SensorReading sensor) => sensor.Status == SensorStatus.Active); - } -} -``` - -### Fallback message handling -When receiving a message on the message pump and none of the registered `IAzureEventHubsMessageHandler`'s can correctly process the message, the message pump normally throws and logs an exception. -It could also happen in a scenario that's to be expected that some received messages will not be processed correctly (or you don't want them to). - -In such a scenario, you can choose to register a `IFallbackMessageHandler` in the dependency container. -This extra message handler will then process the remaining messages that can't be processed by the normal message handlers. - -Following example shows how such a message handler can be implemented: - -```csharp -using Arcus.Messaging.Pumps.EventHubs; -using Microsoft.Extensions.Logging; - -public class WarnsUserFallbackMessageHandler : IFallbackMessageHandler -{ - private readonly ILogger _logger; - - public WarnsUserFallbackMessageHandler(ILogger logger) - { - _logger = logger; - } - - public async Task ProcessMessageAsync(string message, MessageContext context, ...) - { - _logger.LogWarning("These type of messages are expected not to be processed"); - } -} -``` - -And to register such an implementation: - -```csharp -using Microsoft.Extensions.DependencyInjection; - -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - services.AddEventHubsMessagePump(...) - .WithFallbackMessageHandler(); - } -} -``` - -## Pump Configuration -The Azure EventHubs message pump can be configured further to met your needs. - -```csharp -using Microsoft.Extensions.DependencyInjection; - -public class Program -{ - public void ConfigureServices(IServiceCollection services) - { - // Uses managed identity to authenticate with Azure EventHubs. - // 🎖️ Recommended. - services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName: "", - fullyQualifiedNamespace: "", - blobContainerUri: ""); - - // 🚩 Arcus secret store will be used to lookup the connection strings (non-managed identity variants), - // for more information about the Arcus secret store see: https://security.arcus-azure.net/features/secret-store - services.AddSecretStore(stores => ...); - - // Options available across all variants: - services.AddEventHubsMessagePump(..., options => - { - // The name of the consumer group this processor is associated with. Events are read in the context of this group. - // Default: "$Default" - options.ConsumerGroup = ""; - - // The name of the Azure EventHubs message property that has the transaction ID. - // (default: Transaction-Id). - options.Routing.Correlation.TransactionIdPropertyName = "X-Transaction-ID"; - - // The format of the message correlation used when receiving Azure EventHubs messages. - // (default: W3C). - options.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; - - // The name of the operation used when tracking the request telemetry. - // (default: Process) - options.Routing.Correlation.OperationName = "Sensor"; - - // The name of the Azure EventHubs message property that has the upstream service ID. - // ⚠ Only used when the correlation format is configured as Hierarchical. - // (default: Operation-Parent-Id). - options.Routing.Correlation.OperationParentIdPropertyName = "X-Operation-Parent-ID"; - - // The property name to enrich the log event with the correlation information cycle ID. - // ⚠ Only used when the correlation format is configured as Hierarchical. - // (default: CycleId) - options.Routing.CorrelationEnricher.CycleIdPropertyName = "X-CycleId"; - - // Indicate whether or not the default built-in JSON deserialization should ignore additional members - // when deserializing the incoming message (default: AdditionalMemberHandling.Error). - options.Routing.Deserialization.AdditionalMembers = AdditionalMembersHandling.Ignore; - }); - } -} -``` - -## Message Correlation -The message correlation of the received messages is set automatically. All the message handlers will have access to the current `MessageCorrelationInfo` correlation model for the specific currently processed message. - -To retrieve the correlation information in other application code, you can use a dedicated marker interface called `IMessageCorrelationInfoAccessor`. -Note that this is a scoped dependency and so will be the same instance across a scoped operation. - -```csharp -using Arcus.Messaging.Abstractions; - -public class DependencyService -{ - private readonly IMessageCorrelationInfoAccessor _accessor; - - public DependencyService(IMessageCorrelationInfoAccessor accessor) - { - _accessor = accessor; - } - - public void Method() - { - MessageCorrelationInfo correlation = _accessor.GetCorrelationInfo(); - - _accessor.SetCorrelation(correlation); - } -} -``` diff --git a/docs/preview/02-Features/06-general-messaging.md b/docs/preview/02-Features/06-general-messaging.md index 014a3e9b..2e3eac33 100644 --- a/docs/preview/02-Features/06-general-messaging.md +++ b/docs/preview/02-Features/06-general-messaging.md @@ -155,5 +155,4 @@ public class RateLimitService ⚡ Besides the `PauseProcessingMessagesAsync` method, there also exists `Stop...`/`Start...` variants so that you can control the time dynamically when the pump is allowed to run again. For more information on message pumps: -- [Azure Service Bus message pump](./02-message-handling/01-service-bus.md) -- [Azure EventHubs message pump](./02-message-handling/03-event-hubs.md) +- [Azure Service Bus message pump](./02-message-handling/01-service-bus.md) \ No newline at end of file diff --git a/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj b/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj deleted file mode 100644 index 02497717..00000000 --- a/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj +++ /dev/null @@ -1,41 +0,0 @@ - - - - net8.0;net6.0;netstandard2.1 - latest - Arcus - Arcus - Arcus.Messaging - Provides capability to run an Azure Event Hubs message pump - Copyright (c) Arcus - https://messaging.arcus-azure.net/ - https://github.com/arcus-azure/arcus.messaging - LICENSE - icon.png - README.md - Git - Azure;Messaging;EventHubs - true - true - true - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/src/Arcus.Messaging.Pumps.EventHubs/AzureEventHubsMessagePump.cs b/src/Arcus.Messaging.Pumps.EventHubs/AzureEventHubsMessagePump.cs deleted file mode 100644 index 37d7b732..00000000 --- a/src/Arcus.Messaging.Pumps.EventHubs/AzureEventHubsMessagePump.cs +++ /dev/null @@ -1,231 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Abstractions.EventHubs; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Abstractions.MessageHandling; -using Arcus.Messaging.Pumps.Abstractions; -using Arcus.Messaging.Pumps.EventHubs.Configuration; -using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Processor; -using Microsoft.ApplicationInsights; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Arcus.Messaging.Pumps.EventHubs -{ - /// - /// Represents a message pump for processing messages on an Azure EventHubs resource. - /// - /// - public class AzureEventHubsMessagePump : MessagePump, IRestartableMessagePump - { - private readonly AzureEventHubsMessagePumpConfig _eventHubsConfig; - private readonly IAzureEventHubsMessageRouter _messageRouter; - private readonly IDisposable _loggingScope; - - private EventProcessorClient _eventProcessor; - private bool _isHostShuttingDown; - - /// - /// Initializes a new instance of the class. - /// - /// The configuration instance to setup the interaction with Azure EventHubs. - /// The application configuration instance to retrieve additional information for the message pump. - /// The application's service provider to retrieve registered services during the lifetime of the message pump, like registered 's and its dependencies. - /// The registered message router to route incoming Azure EventHubs event messages through user-registered 's. - /// The logger instance to write diagnostic messages during the lifetime of the message pump. - /// - /// Thrown when the , , - /// , , or the is null. - /// - internal AzureEventHubsMessagePump( - AzureEventHubsMessagePumpConfig eventHubsConfiguration, - IConfiguration applicationConfiguration, - IServiceProvider serviceProvider, - IAzureEventHubsMessageRouter messageRouter, - ILogger logger) - : base(applicationConfiguration, serviceProvider, logger) - { - _eventHubsConfig = eventHubsConfiguration ?? throw new ArgumentNullException(nameof(eventHubsConfiguration)); - _messageRouter = messageRouter ?? throw new ArgumentNullException(nameof(messageRouter)); - - JobId = _eventHubsConfig.Options.JobId; - _loggingScope = logger.BeginScope("Job: {JobId}", JobId); - } - - private string EventHubName => _eventProcessor?.EventHubName; - private string ConsumerGroup => _eventProcessor?.ConsumerGroup; - private string Namespace => _eventProcessor?.FullyQualifiedNamespace; - - /// - /// This method is called when the starts. The implementation should return a task that represents - /// the lifetime of the long running operation(s) being performed. - /// - /// Triggered when is called. - /// A that represents the long running operations. - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - try - { - await StartProcessingMessagesAsync(stoppingToken); - await UntilCancelledAsync(stoppingToken); - } - catch (Exception exception) when (exception is TaskCanceledException || exception is OperationCanceledException) - { - Logger.LogDebug("Azure EventHubs message pump '{JobId}' '{ConsumerGroup}/{EventHubsName}' in '{Namespace}' is cancelled", JobId, ConsumerGroup, EventHubName, Namespace); - } - catch (Exception exception) - { - Logger.LogCritical(exception, "Unexpected failure occurred during processing of messages in the Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}': {Message}", JobId, ConsumerGroup, EventHubName, Namespace, exception.Message); - } - finally - { - await StopProcessingMessagesAsync(CancellationToken.None); - } - } - - /// - /// Programmatically restart the message pump. - /// - /// The token to cancel the restart process. - public async Task RestartAsync(CancellationToken cancellationToken) - { - Logger.LogTrace("Restarting Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}' ...", JobId, ConsumerGroup, EventHubName, Namespace); - await StopProcessingMessagesAsync(cancellationToken); - await StartProcessingMessagesAsync(cancellationToken); - Logger.LogInformation("Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}' restarted!", JobId, ConsumerGroup, EventHubName, Namespace); - } - - /// - public override async Task StartProcessingMessagesAsync(CancellationToken stoppingToken) - { - if (IsStarted) - { - return; - } - - await base.StartProcessingMessagesAsync(stoppingToken); - - _eventProcessor = await _eventHubsConfig.CreateEventProcessorClientAsync(); - _eventProcessor.ProcessEventAsync += ProcessMessageAsync; - _eventProcessor.ProcessErrorAsync += ProcessErrorAsync; - - Logger.LogTrace("Starting Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}'", JobId, ConsumerGroup, EventHubName, Namespace); - await _eventProcessor.StartProcessingAsync(stoppingToken); - Logger.LogInformation("Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}' started: {Time}", JobId, ConsumerGroup, EventHubName, Namespace, DateTimeOffset.UtcNow); - } - - private async Task ProcessMessageAsync(ProcessEventArgs args) - { - EventData message = args.Data; - if (message is null) - { - Logger.LogWarning("Received message on Azure EventHubs message pump '{JobId}' was null, skipping", JobId); - return; - } - - if (_isHostShuttingDown) - { - Logger.LogWarning("Abandoning message with ID '{MessageId}' as the Azure EventHubs message pump is shutting down", args.Data.MessageId); - return; - } - - if (string.IsNullOrEmpty(message.CorrelationId)) - { - Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure EventHubs message pump '{JobId}'", message.MessageId, JobId); - } - - AzureEventHubsMessageContext context = args.Data.GetMessageContext(_eventProcessor, JobId); - using (MessageCorrelationResult result = DetermineMessageCorrelation(args.Data)) - { - await _messageRouter.RouteMessageAsync(args.Data, context, result.CorrelationInfo, args.CancellationToken); - await args.UpdateCheckpointAsync(args.CancellationToken); - } - } - - private MessageCorrelationResult DetermineMessageCorrelation(EventData message) - { - if (_eventHubsConfig.Options.Routing.Correlation.Format is MessageCorrelationFormat.W3C) - { - (string transactionId, string operationParentId) = message.Properties.GetTraceParent(); - var client = ServiceProvider.GetRequiredService(); - return MessageCorrelationResult.Create(client, transactionId, operationParentId); - } - - MessageCorrelationInfo correlation = message.GetCorrelationInfo( - transactionIdPropertyName: _eventHubsConfig.Options.Routing.Correlation.TransactionIdPropertyName, - operationParentIdPropertyName: _eventHubsConfig.Options.Routing.Correlation.OperationParentIdPropertyName); - - return MessageCorrelationResult.Create(correlation); - } - - private Task ProcessErrorAsync(ProcessErrorEventArgs args) - { - if (args.Exception is null) - { - Logger.LogWarning("Thrown exception on Azure EventHubs message pump '{JobId}' was null, skipping", JobId); - } - else if (args.Exception is TaskCanceledException) - { - Logger.LogDebug("Azure EventHubs message pump '{JobId}' is cancelled", JobId); - } - else - { - Logger.LogCritical(args.Exception, "Unable to process message in Azure EventHubs message pump '{JobId}' from {ConsumerGroup}/{EventHubName} with client {ClientId}", JobId, ConsumerGroup, EventHubName, _eventProcessor.Identifier); - } - - return Task.CompletedTask; - } - - /// - public override async Task StopProcessingMessagesAsync(CancellationToken cancellationToken) - { - if (!IsStarted) - { - return; - } - - await base.StopProcessingMessagesAsync(cancellationToken); - - try - { - Logger.LogTrace("Stopping Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}'", JobId, ConsumerGroup, EventHubName, Namespace); - - if (_eventProcessor != null) - { - await _eventProcessor.StopProcessingAsync(cancellationToken); - _eventProcessor.ProcessEventAsync -= ProcessMessageAsync; - _eventProcessor.ProcessErrorAsync -= ProcessErrorAsync; - } - - Logger.LogInformation("Azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}' stopped: {Time}", JobId, ConsumerGroup, EventHubName, Namespace, DateTimeOffset.UtcNow); - } - catch (Exception exception) - { - Logger.LogWarning(exception, "Cannot correctly close the azure EventHubs message pump '{JobId}' on '{ConsumerGroup}/{EventHubsName}' in '{Namespace}': {Message}", JobId, ConsumerGroup, EventHubName, Namespace, exception.Message); - } - } - - private static async Task UntilCancelledAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested == false) - { - await Task.Delay(Timeout.Infinite, cancellationToken); - } - } - - /// - /// Triggered when the message pump is performing a graceful shutdown. - /// - /// Indicates that the shutdown process should no longer be graceful. - public override async Task StopAsync(CancellationToken cancellationToken) - { - await base.StopAsync(cancellationToken); - _loggingScope?.Dispose(); - _isHostShuttingDown = true; - } - } -} diff --git a/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpConfig.cs b/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpConfig.cs deleted file mode 100644 index 6a40e12a..00000000 --- a/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpConfig.cs +++ /dev/null @@ -1,186 +0,0 @@ -using System; -using System.Threading.Tasks; -using Arcus.Security.Core; -using Azure.Core; -using Azure.Messaging.EventHubs; -using Azure.Storage.Blobs; - -namespace Arcus.Messaging.Pumps.EventHubs.Configuration -{ - /// - /// Represents the Azure EventHubs configuration that is used to setup the interaction with Azure EventHubs for the . - /// - internal class AzureEventHubsMessagePumpConfig - { - private readonly Func> _createClient; - - private AzureEventHubsMessagePumpConfig( - Func> createClient, - AzureEventHubsMessagePumpOptions options) - { - _createClient = createClient; - - Options = options; - } - - /// - /// Gets the additional options to influence the behavior of the message pump. - /// - public AzureEventHubsMessagePumpOptions Options { get; } - - /// - /// Creates an instance by using a connection string to interact with Azure EventHubs. - /// - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// - /// The name of the Azure Blob storage container in the storage account to reference where the event checkpoints will be stored and the load balanced. - /// - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// - /// The application's secret provider provider to retrieve the connection strings from both and . - /// - /// The additional options to influence the behavior of the message pump. - /// - /// Thrown when the , the , the , - /// or the is blank. - /// - /// Thrown when the or the is null. - internal static AzureEventHubsMessagePumpConfig CreateByConnectionString( - string eventHubsName, - string eventHubsConnectionStringSecretName, - string blobContainerName, - string storageAccountConnectionStringSecretName, - ISecretProvider secretProvider, - AzureEventHubsMessagePumpOptions options) - { - if (string.IsNullOrWhiteSpace(eventHubsName)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs name to add a message pump", nameof(eventHubsName)); - } - - if (string.IsNullOrWhiteSpace(eventHubsConnectionStringSecretName)) - { - throw new ArgumentException("Requires a non-blank secret name that points to an Azure Event Hubs connection string", nameof(eventHubsConnectionStringSecretName)); - } - - if (string.IsNullOrWhiteSpace(blobContainerName)) - { - throw new ArgumentException("Requires a non-blank name for the Azure Blob container name, linked to the Azure Event Hubs", nameof(blobContainerName)); - } - - if (string.IsNullOrWhiteSpace(storageAccountConnectionStringSecretName)) - { - throw new ArgumentException("Requires a non-blank secret name that points to an Azure Blob storage connection string", nameof(storageAccountConnectionStringSecretName)); - } - - if (secretProvider is null) - { - throw new ArgumentNullException(nameof(secretProvider)); - } - - if (options is null) - { - throw new ArgumentNullException(nameof(options)); - } - - return new AzureEventHubsMessagePumpConfig(async () => - { - string storageAccountConnectionString = await GetConnectionStringFromSecretAsync(secretProvider, storageAccountConnectionStringSecretName, "Azure Blob storage account"); - var storageClient = new BlobContainerClient(storageAccountConnectionString, blobContainerName); - - string eventHubsConnectionString = await GetConnectionStringFromSecretAsync(secretProvider, eventHubsConnectionStringSecretName, "Azure EventHubs"); - var eventProcessor = new EventProcessorClient(storageClient, options.ConsumerGroup, eventHubsConnectionString, eventHubsName); - - return eventProcessor; - }, options); - } - - private static async Task GetConnectionStringFromSecretAsync( - ISecretProvider secretProvider, - string connectionStringSecretName, - string connectionStringType) - { - Task getConnectionStringTask = secretProvider.GetRawSecretAsync(connectionStringSecretName); - if (getConnectionStringTask is null) - { - throw new InvalidOperationException( - $"Cannot retrieve {connectionStringType} connection string via calling the '{secretProvider.GetType().Name}' because the operation resulted in 'null'"); - } - - return await getConnectionStringTask; - } - - /// - /// Creates a instance by using a token credential to interact with Azure EventHubs. - /// - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. - /// - /// The referencing the blob container that includes the - /// name of the account and the name of the container. - /// This is likely to be similar to "https://{account_name}.blob.core.windows.net/{container_name}". - /// - /// The Azure identity credential to sign requests. - /// The additional options to influence the behavior of the message pump. - /// Thrown when the or the is blank. - /// Thrown when the or the is null. - /// Thrown when the is not an absolute URI. - internal static AzureEventHubsMessagePumpConfig CreateByTokenCredential( - string eventHubsName, - string fullyQualifiedNamespace, - Uri blobContainerUri, - TokenCredential credential, - AzureEventHubsMessagePumpOptions options) - { - - if (string.IsNullOrWhiteSpace(eventHubsName)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs name to add a message pump config", nameof(eventHubsName)); - } - - if (string.IsNullOrWhiteSpace(fullyQualifiedNamespace)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs fully-qualified namespace to add a message pump config", nameof(eventHubsName)); - } - - if (!blobContainerUri.IsAbsoluteUri) - { - throw new UriFormatException("Requires a valid absolute URI endpoint for the Azure Blob container to store event checkpoints and load balance the consumed event messages send to the message pump"); - } - - if (credential is null) - { - throw new ArgumentNullException(nameof(credential)); - } - - if (options is null) - { - throw new ArgumentNullException(nameof(options)); - } - - return new AzureEventHubsMessagePumpConfig(() => - { - var storageClient = new BlobContainerClient(blobContainerUri, credential); - var eventProcessor = new EventProcessorClient(storageClient, options.ConsumerGroup, fullyQualifiedNamespace, eventHubsName, credential); - - return Task.FromResult(eventProcessor); - }, options); - } - - /// - /// Creates an based on the provided information in this configuration instance. - /// - /// - /// Thrown when no Arcus secret store is configured in the application or when the secret store was not configured correctly. - /// - internal async Task CreateEventProcessorClientAsync() - { - return await _createClient(); - } - } -} diff --git a/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpOptions.cs b/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpOptions.cs deleted file mode 100644 index 003c90e3..00000000 --- a/src/Arcus.Messaging.Pumps.EventHubs/Configuration/AzureEventHubsMessagePumpOptions.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using GuardNet; - -namespace Arcus.Messaging.Pumps.EventHubs.Configuration -{ - /// - /// Represents the additional set of options to configure the behavior of the . - /// - public class AzureEventHubsMessagePumpOptions - { - private string _consumerGroup = "$Default", - _jobId = Guid.NewGuid().ToString(); - - /// - /// Gets or sets the name of the consumer group this processor is associated with. Events are read in the context of this group. (Default: "$Default"). - /// - /// Thrown when the is blank. - public string ConsumerGroup - { - get => _consumerGroup; - set - { - Guard.NotNullOrWhitespace(value, nameof(value), "Requires a non-blank Azure EventHubs consumer group to consume event messages from"); - _consumerGroup = value; - } - } - - /// - /// Gets or sets the unique identifier for this background job to distinguish this job instance in a multi-instance deployment. - /// - /// Thrown when the is blank. - public string JobId - { - get => _jobId; - set - { - Guard.NotNullOrWhitespace(value, nameof(value), "Requires a non-blank job identifier for the Azure EventHubs message pump"); - _jobId = value; - } - } - - /// - /// Gets the consumer-configurable options to change the behavior of the message router. - /// - public AzureEventHubsMessageRouterOptions Routing { get; } = new AzureEventHubsMessageRouterOptions(); - } -} diff --git a/src/Arcus.Messaging.Pumps.EventHubs/Extensions/IServiceCollectionExtensions.cs b/src/Arcus.Messaging.Pumps.EventHubs/Extensions/IServiceCollectionExtensions.cs deleted file mode 100644 index 04c54fab..00000000 --- a/src/Arcus.Messaging.Pumps.EventHubs/Extensions/IServiceCollectionExtensions.cs +++ /dev/null @@ -1,355 +0,0 @@ -using System; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Pumps.EventHubs; -using Arcus.Messaging.Pumps.EventHubs.Configuration; -using Arcus.Security.Core; -using Arcus.Security.Core.Caching; -using Azure.Identity; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; - -// ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.DependencyInjection -{ - /// - /// Extensions on the to add a - /// and its 's implementations. - /// - // ReSharper disable once InconsistentNaming - public static class IServiceCollectionExtensions - { - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// - /// Make sure that the application has the Arcus secret store configured correctly. - /// For more on the Arcus secret store: . - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// - /// The name of the Azure Blob storage container in the storage account to reference where the event checkpoints will be stored and the load balanced. - /// - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// A collection where the s can be configured. - /// Thrown when the is null. - /// - /// Thrown when the , the , the , - /// or the is blank. - /// - public static EventHubsMessageHandlerCollection AddEventHubsMessagePump( - this IServiceCollection services, - string eventHubsName, - string eventHubsConnectionStringSecretName, - string blobContainerName, - string storageAccountConnectionStringSecretName) - { - return AddEventHubsMessagePump( - services, - eventHubsName: eventHubsName, - eventHubsConnectionStringSecretName: eventHubsConnectionStringSecretName, - blobContainerName: blobContainerName, - storageAccountConnectionStringSecretName: storageAccountConnectionStringSecretName, - configureOptions: null); - } - - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// - /// Make sure that the application has the Arcus secret store configured correctly. - /// For more on the Arcus secret store: . - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// - /// The name of the Azure Blob storage container in the storage account to reference where the event checkpoints will be stored and the load balanced. - /// - /// - /// The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store () implementation. - /// - /// The function to configure additional options to influence the behavior of the message pump. - /// A collection where the s can be configured. - /// Thrown when the is null. - /// - /// Thrown when the , the , the , - /// or the is blank. - /// - public static EventHubsMessageHandlerCollection AddEventHubsMessagePump( - this IServiceCollection services, - string eventHubsName, - string eventHubsConnectionStringSecretName, - string blobContainerName, - string storageAccountConnectionStringSecretName, - Action configureOptions) - { - if (string.IsNullOrWhiteSpace(eventHubsName)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs name to add a message pump", nameof(eventHubsName)); - } - - if (string.IsNullOrWhiteSpace(eventHubsConnectionStringSecretName)) - { - throw new ArgumentException("Requires a non-blank secret name that points to an Azure Event Hubs connection string", nameof(eventHubsConnectionStringSecretName)); - } - - if (string.IsNullOrWhiteSpace(blobContainerName)) - { - throw new ArgumentException("Requires a non-blank name for the Azure Blob container name, linked to the Azure Event Hubs", nameof(blobContainerName)); - } - - if (string.IsNullOrWhiteSpace(storageAccountConnectionStringSecretName)) - { - throw new ArgumentException("Requires a non-blank secret name that points to an Azure Blob storage connection string", nameof(storageAccountConnectionStringSecretName)); - } - - return AddMessagePump(services, (serviceProvider, options) => - { - ISecretProvider secretProvider = DetermineSecretProvider(serviceProvider); - - return AzureEventHubsMessagePumpConfig.CreateByConnectionString( - eventHubsName, eventHubsConnectionStringSecretName, - blobContainerName, storageAccountConnectionStringSecretName, - secretProvider, options); - - }, configureOptions); - } - - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. - /// - /// - /// The referencing the blob container that includes the - /// name of the account and the name of the container. - /// This is likely to be similar to "https://{account_name}.blob.core.windows.net/{container_name}". - /// - /// A collection where the s can be configured. - /// Thrown when the or the is blank. - /// Thrown when the is null. - /// Thrown when the is not an absolute URI. - public static EventHubsMessageHandlerCollection AddEventHubsMessagePumpUsingManagedIdentity( - this IServiceCollection services, - string eventHubsName, - string fullyQualifiedNamespace, - string blobContainerUri) - { - return AddEventHubsMessagePumpUsingManagedIdentity( - services, - eventHubsName: eventHubsName, - fullyQualifiedNamespace: fullyQualifiedNamespace, - blobContainerUri: blobContainerUri, - configureOptions: null); - } - - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. - /// - /// - /// The referencing the blob container that includes the - /// name of the account and the name of the container. - /// This is likely to be similar to "https://{account_name}.blob.core.windows.net/{container_name}". - /// - /// The function to configure additional options to influence the behavior of the message pump. - /// A collection where the s can be configured. - /// Thrown when the or the is blank. - /// Thrown when the is null. - /// Thrown when the is not an absolute URI. - public static EventHubsMessageHandlerCollection AddEventHubsMessagePumpUsingManagedIdentity( - this IServiceCollection services, - string eventHubsName, - string fullyQualifiedNamespace, - string blobContainerUri, - Action configureOptions) - { - return AddEventHubsMessagePumpUsingManagedIdentity( - services, - eventHubsName: eventHubsName, - fullyQualifiedNamespace: fullyQualifiedNamespace, - blobContainerUri: blobContainerUri, - clientId: null, - configureOptions: configureOptions); - } - - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. - /// - /// - /// The referencing the blob container that includes the - /// name of the account and the name of the container. - /// This is likely to be similar to "https://{account_name}.blob.core.windows.net/{container_name}". - /// - /// - /// The client ID to authenticate for a user assigned managed identity. More information on user assigned managed identities cam be found here: - /// . - /// - /// A collection where the s can be configured. - /// Thrown when the or the is blank. - /// Thrown when the is null. - /// Thrown when the is not an absolute URI. - public static EventHubsMessageHandlerCollection AddEventHubsMessagePumpUsingManagedIdentity( - this IServiceCollection services, - string eventHubsName, - string fullyQualifiedNamespace, - string blobContainerUri, - string clientId) - { - return AddEventHubsMessagePumpUsingManagedIdentity( - services, - eventHubsName: eventHubsName, - fullyQualifiedNamespace: fullyQualifiedNamespace, - blobContainerUri: blobContainerUri, - clientId: clientId, - configureOptions: null); - } - - /// - /// Adds a message pump to consume events from Azure EventHubs. - /// - /// The collection of services to add the message pump to. - /// The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. - /// - /// The fully qualified Event Hubs namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net. - /// - /// - /// The referencing the blob container that includes the - /// name of the account and the name of the container. - /// This is likely to be similar to "https://{account_name}.blob.core.windows.net/{container_name}". - /// - /// - /// The client ID to authenticate for a user assigned managed identity. More information on user assigned managed identities cam be found here: - /// . - /// - /// The function to configure additional options to influence the behavior of the message pump. - /// A collection where the s can be configured. - /// Thrown when the or the is blank. - /// Thrown when the is null. - /// Thrown when the is not an absolute URI. - public static EventHubsMessageHandlerCollection AddEventHubsMessagePumpUsingManagedIdentity( - this IServiceCollection services, - string eventHubsName, - string fullyQualifiedNamespace, - string blobContainerUri, - string clientId, - Action configureOptions) - { - if (string.IsNullOrWhiteSpace(eventHubsName)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs name to add a message pump", nameof(eventHubsName)); - } - - if (string.IsNullOrWhiteSpace(fullyQualifiedNamespace)) - { - throw new ArgumentException("Requires a non-blank Azure Event hubs fully-qualified namespace to add a message pump", nameof(eventHubsName)); - } - - if (string.IsNullOrWhiteSpace(blobContainerUri)) - { - throw new ArgumentException("Requires a non-blank Azure Blob storage container endpoint to store event checkpoints and load balance the consumed event messages send to the message pump", nameof(blobContainerUri)); - } - - if (!Uri.IsWellFormedUriString(blobContainerUri, UriKind.Absolute)) - { - throw new UriFormatException("Requires a valid absolute URI endpoint for the Azure Blob container to store event checkpoints and load balance the consumed event messages send to the message pump"); - } - - return AddMessagePump(services, (_, options) => - { - return AzureEventHubsMessagePumpConfig.CreateByTokenCredential( - eventHubsName, - fullyQualifiedNamespace, - new Uri(blobContainerUri), - new DefaultAzureCredential(new DefaultAzureCredentialOptions { ManagedIdentityClientId = clientId }), - options); - - }, configureOptions); - } - - private static EventHubsMessageHandlerCollection AddMessagePump( - this IServiceCollection services, - Func createConfig, - Action configureOptions) - { - AzureEventHubsMessagePumpOptions options = CreateOptions(configureOptions); - EventHubsMessageHandlerCollection collection = services.AddMessageRouter(options); - - services.AddMessagePump(serviceProvider => - { - AzureEventHubsMessagePumpConfig config = createConfig(serviceProvider, options); - return CreateMessagePump(serviceProvider, config); - }); - - return collection; - } - - private static AzureEventHubsMessagePumpOptions CreateOptions(Action configureOptions) - { - var options = new AzureEventHubsMessagePumpOptions(); - configureOptions?.Invoke(options); - - return options; - } - - private static AzureEventHubsMessagePump CreateMessagePump(IServiceProvider serviceProvider, AzureEventHubsMessagePumpConfig eventHubsConfig) - { - var appConfiguration = serviceProvider.GetRequiredService(); - var router = serviceProvider.GetService(); - var logger = serviceProvider.GetRequiredService>(); - - return new AzureEventHubsMessagePump(eventHubsConfig, appConfiguration, serviceProvider, router, logger); - } - - private static EventHubsMessageHandlerCollection AddMessageRouter(this IServiceCollection services, AzureEventHubsMessagePumpOptions options) - { - EventHubsMessageHandlerCollection collection = services.AddEventHubsMessageRouting(provider => - { - var logger = provider.GetService>(); - return new AzureEventHubsMessageRouter(provider, options.Routing, logger); - }); - collection.JobId = options.JobId; - - return collection; - } - - private static ISecretProvider DetermineSecretProvider(IServiceProvider serviceProvider) - { - var secretProvider = - serviceProvider.GetService() - ?? serviceProvider.GetService(); - - if (secretProvider is null) - { - throw new InvalidOperationException( - "Could not retrieve the Azure EventHubs or Azure storage account connection string from the Arcus secret store because no secret store was configured in the application," - + $"please configure the Arcus secret store with '{nameof(IHostBuilderExtensions.ConfigureSecretStore)}' on the application '{nameof(IHost)}' " - + $"or during the service collection registration 'AddSecretStore' on the application '{nameof(IServiceCollection)}'." - + "For more information on the Arcus secret store, see: https://security.arcus-azure.net/features/secret-store"); - } - - return secretProvider; - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj index c3588479..eab1d0f1 100644 --- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj +++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj @@ -32,7 +32,7 @@ - + @@ -49,4 +49,8 @@ + + + + diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/EventHubsEntityFixture.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/EventHubsEntityFixture.cs new file mode 100644 index 00000000..f5faa84d --- /dev/null +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/EventHubsEntityFixture.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; +using Arcus.Testing; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; + +namespace Arcus.Messaging.Tests.Integration.Fixture +{ + public class EventHubsEntityFixture : IAsyncLifetime + { + private TemporaryEventHubEntity _hub; + + public string HubName { get; } = $"hub-{Guid.NewGuid()}"; + + public async Task InitializeAsync() + { + var config = TestConfig.Create(); + _hub = await TemporaryEventHubEntity.CreateAsync(HubName, config.GetEventHubs(), NullLogger.Instance); + } + + public async Task DisposeAsync() + { + await _hub.DisposeAsync(); + } + } +} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubs/TestEventHubsMessageProducer.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubs/TestEventHubsMessageProducer.cs deleted file mode 100644 index 6074f73a..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubs/TestEventHubsMessageProducer.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using System.Runtime.Serialization; -using System.Threading.Tasks; -using Arcus.Messaging.Tests.Integration.Fixture; -using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Producer; -using GuardNet; - -namespace Arcus.Messaging.Tests.Integration.MessagePump.EventHubs -{ - /// - /// Represents an Azure EventHubs message producer that places event messages on a configured Azure EventHubs. - /// - public class TestEventHubsMessageProducer - { - private readonly string _name; - private readonly EventHubsConfig _config; - - /// - /// Initializes a new instance of the class. - /// - public TestEventHubsMessageProducer(string name, EventHubsConfig config) - { - _name = name; - _config = config; - } - - /// - /// Places an event message on the configured Azure EventHubs. - /// - /// The event message to place on the Azure EventHubs. - /// Thrown when the is null. - /// - /// Thrown when the has a member in its collection that is an - /// unsupported type for serialization. See the remarks for details. - /// - public async Task ProduceAsync(EventData eventData) - { - Guard.NotNull(eventData, nameof(eventData), "Requires an event data instance to place on the configured Azure EventHubs"); - - await using (EventHubProducerClient client = _config.GetProducerClient(_name)) - { - await client.SendAsync(new[] { eventData }); - } - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs deleted file mode 100644 index 43ff7475..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.ConnectivityTests.cs +++ /dev/null @@ -1,98 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Arcus.Messaging.Pumps.Abstractions; -using Arcus.Messaging.Pumps.EventHubs; -using Arcus.Messaging.Tests.Core.Correlation; -using Arcus.Messaging.Tests.Core.Events.v1; -using Arcus.Messaging.Tests.Core.Messages.v1; -using Arcus.Messaging.Tests.Integration.Fixture; -using Arcus.Messaging.Tests.Integration.MessagePump.EventHubs; -using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; -using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageHandlers; -using Azure.Messaging.EventHubs; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Xunit; - -namespace Arcus.Messaging.Tests.Integration.MessagePump -{ - public partial class EventHubsMessagePumpTests - { - [Fact] - public async Task EventHubsMessagePumpUsingSecrets_PublishesMessage_MessageSuccessfullyProcessed() - { - string eventHubsConnectionStringSecretName = "Arcus_EventHubs_ConnectionString", - storageAccountConnectionStringSecretName = "Arcus_StorageAccount_ConnectionString"; - - await TestEventHubsMessageHandlingAsync(options => - { - options.AddSecretStore(stores => stores.AddInMemory(new Dictionary - { - [eventHubsConnectionStringSecretName] = _eventHubsConfig.EventHubsConnectionString, - [storageAccountConnectionStringSecretName] = _eventHubsConfig.Storage.ConnectionString - })); - - options.AddEventHubsMessagePump(EventHubsName, eventHubsConnectionStringSecretName, ContainerName, storageAccountConnectionStringSecretName) - .WithEventHubsMessageHandler(); - }); - } - - [Fact] - public async Task RestartedEventHubsMessagePump_PublishMessage_MessageSuccessfullyProcessed() - { - // Arrange - var options = new WorkerOptions(); - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler(); - - EventData expected = CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()); - TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - - await using var worker = await Worker.StartNewAsync(options); - - IEnumerable messagePumps = - worker.Services.GetServices() - .OfType(); - - AzureEventHubsMessagePump messagePump = Assert.Single(messagePumps); - Assert.NotNull(messagePump); - - await messagePump.RestartAsync(CancellationToken.None); - - // Act - await producer.ProduceAsync(expected); - - // Assert - SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(expected.MessageId); - AssertReceivedSensorEventDataForW3C(expected, actual); - } - - [Fact] - public async Task EventHubsMessagePump_PausesViaLifetime_RestartsAgain() - { - // Arrange - string jobId = Guid.NewGuid().ToString(); - var options = new WorkerOptions(); - AddEventHubsMessagePump(options, opt => opt.JobId = jobId) - .WithEventHubsMessageHandler(); - - EventData expected = CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()); - TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - - await using var worker = await Worker.StartNewAsync(options); - - var lifetime = worker.Services.GetRequiredService(); - await lifetime.PauseProcessingMessagesAsync(jobId, TimeSpan.FromSeconds(5), CancellationToken.None); - - // Act - await producer.ProduceAsync(expected); - - // Assert - SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(expected.MessageId); - AssertReceivedSensorEventDataForW3C(expected, actual); - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.RouterTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.RouterTests.cs deleted file mode 100644 index 7833e72b..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.RouterTests.cs +++ /dev/null @@ -1,128 +0,0 @@ -using System; -using System.Threading.Tasks; -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Abstractions.EventHubs; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Tests.Core.Messages.v1; -using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageBodySerializers; -using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageHandlers; -using Arcus.Messaging.Tests.Workers.MessageBodyHandlers; -using Arcus.Messaging.Tests.Workers.MessageHandlers; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Xunit; - -namespace Arcus.Messaging.Tests.Integration.MessagePump -{ - public partial class EventHubsMessagePumpTests - { - [Fact] - public async Task EventHubsMessagePumpWithMessageContextFilter_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler, SensorReading>(messageContextFilter: _ => false) - .WithEventHubsMessageHandler(); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithMessageFilter_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler, SensorReading>(messageBodyFilter: _ => false) - .WithEventHubsMessageHandler(); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithDifferentMessageType_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler, Shipment>() - .WithEventHubsMessageHandler(); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithMessageBodySerializer_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler( - messageBodySerializerImplementationFactory: provider => - { - var logger = provider.GetRequiredService>(); - return new SensorReadingBatchBodySerializer(logger); - }); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithFallback_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler, Shipment>() - .WithFallbackMessageHandler() - .WithFallbackMessageHandler(); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithAllFiltersAndOptions_PublishesMessage_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options) - .WithEventHubsMessageHandler, Shipment>() - .WithEventHubsMessageHandler, SensorReading>(messageBodyFilter: _ => false) - .WithEventHubsMessageHandler, SensorReading>(messageContextFilter: _ => false) - .WithEventHubsMessageHandler( - messageContextFilter: context => context.ConsumerGroup == "$Default" - && context.EventHubsName == EventHubsName - && context.EventHubsNamespace == FullyQualifiedEventHubsNamespace, - messageBodySerializerImplementationFactory: provider => - { - var logger = provider.GetService>(); - return new OrderBatchMessageBodySerializer(logger); - }, - messageBodyFilter: order => Guid.TryParse(order.SensorId, out Guid _), - messageHandlerImplementationFactory: provider => - { - return new WriteSensorToDiskEventHubsMessageHandler( - provider.GetRequiredService(), - provider.GetRequiredService>()); - }); - }); - } - - [Fact] - public async Task EventHubsMessagePumpWithoutSameJobId_PublishesMessage_MessageFailsToBeProcessed() - { - await Assert.ThrowsAsync(() => - { - return TestEventHubsMessageHandlingAsync(options => - { - EventHubsMessageHandlerCollection collection = AddEventHubsMessagePump(options); - Assert.False(string.IsNullOrWhiteSpace(collection.JobId)); - - var otherCollection = new EventHubsMessageHandlerCollection(new ServiceCollection()) - { - JobId = Guid.NewGuid().ToString() - }; - - otherCollection.WithEventHubsMessageHandler, SensorReading>(messageContextFilter: _ => false) - .WithEventHubsMessageHandler(); - }); - }); - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs deleted file mode 100644 index 71188d9d..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePump.TelemetryTests.cs +++ /dev/null @@ -1,181 +0,0 @@ -using System; -using System.Threading.Tasks; -using Arcus.Messaging.Abstractions.MessageHandling; -using Arcus.Messaging.Tests.Core.Correlation; -using Arcus.Messaging.Tests.Core.Events.v1; -using Arcus.Messaging.Tests.Core.Messages.v1; -using Arcus.Messaging.Tests.Integration.Fixture; -using Arcus.Messaging.Tests.Integration.Fixture.Logging; -using Arcus.Messaging.Tests.Integration.MessagePump.EventHubs; -using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; -using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageHandlers; -using Arcus.Testing; -using Azure.Messaging.EventHubs; -using Microsoft.ApplicationInsights.DataContracts; -using Microsoft.ApplicationInsights.Extensibility; -using Microsoft.Extensions.DependencyInjection; -using Serilog; -using Xunit; -using static Arcus.Messaging.Tests.Integration.MessagePump.Fixture.AssertX; - -namespace Arcus.Messaging.Tests.Integration.MessagePump -{ - public partial class EventHubsMessagePumpTests - { - [Fact] - public async Task EventHubsMessagePump_PublishMessageForHierarchical_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options, opt => opt.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical) - .WithEventHubsMessageHandler(); - }, MessageCorrelationFormat.Hierarchical); - } - - [Fact] - public async Task EventHubsMessagePump_PublishMessageForW3C_MessageSuccessfullyProcessed() - { - await TestEventHubsMessageHandlingAsync(options => - { - AddEventHubsMessagePump(options, opt => opt.Routing.Correlation.Format = MessageCorrelationFormat.W3C) - .WithEventHubsMessageHandler(); - }); - } - - [Fact] - public async Task EventHubsMessagePump_WithCustomTransactionIdProperty_RetrievesCorrelationCorrectlyDuringMessageProcessing() - { - // Arrange - var options = new WorkerOptions(); - var customTransactionIdPropertyName = "MyTransactionId"; - AddEventHubsMessagePump(options, opt => - { - opt.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; - opt.Routing.Correlation.TransactionIdPropertyName = customTransactionIdPropertyName; - }) - .WithEventHubsMessageHandler(); - - EventData message = CreateSensorEventDataForHierarchical(transactionIdPropertyName: customTransactionIdPropertyName); - - // Act - await TestEventHubsMessageHandlingAsync(options, message, async () => - { - // Assert - SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(message.MessageId); - AssertReceivedSensorEventDataForHierarchical(message, actual, transactionIdPropertyName: customTransactionIdPropertyName); - }); - } - - [Fact] - public async Task EventHubsMessagePump_WithCustomOperationParentIdProperty_RetrievesCorrelationCorrectlyDuringMessageProcessing() - { - // Arrange - var customOperationParentIdPropertyName = "MyOperationParentId"; - var options = new WorkerOptions(); - AddEventHubsMessagePump(options, opt => - { - opt.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; - opt.Routing.Correlation.OperationParentIdPropertyName = customOperationParentIdPropertyName; - }) - .WithEventHubsMessageHandler(); - - EventData message = CreateSensorEventDataForHierarchical(operationParentIdPropertyName: customOperationParentIdPropertyName); - - // Act - await TestEventHubsMessageHandlingAsync(options, message, async () => - { - // Assert - SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(message.MessageId); - AssertReceivedSensorEventDataForHierarchical(message, actual, operationParentIdPropertyName: customOperationParentIdPropertyName); - }); - } - - [Fact] - public async Task EventHubsMessagePump_WithW3CCorrelationFormat_AutomaticallyTracksMicrosoftDependencies() - { - // Arrange - var spySink = new InMemoryApplicationInsightsTelemetryConverter(_logger); - var spyChannel = new InMemoryTelemetryChannel(_logger); - - var options = new WorkerOptions(); - options.ConfigureSerilog(config => config.WriteTo.ApplicationInsights(spySink)); - - var traceParent = TraceParent.Generate(); - EventData eventData = CreateSensorEventDataForW3C(traceParent: traceParent); - - string operationName = $"operation-{Guid.NewGuid()}"; - AddEventHubsMessagePump(options, opt => opt.Routing.Telemetry.OperationName = operationName) - .WithEventHubsMessageHandler(); - - TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - - await using (var worker = await Worker.StartNewAsync(options)) - { - worker.Services.GetRequiredService().TelemetryChannel = spyChannel; - - // Act - await producer.ProduceAsync(eventData); - - // Assert - RequestTelemetry requestViaArcusEventHubs = - await Poll.Target(() => GetRequestFrom(spySink.Telemetries, r => r.Context.Operation.Name == operationName)) - .Timeout(TimeSpan.FromMinutes(2)) - .FailWith("missing request telemetry with operation name in spied sink"); - - DependencyTelemetry dependencyViaArcusKeyVault = - await Poll.Target(() => GetDependencyFrom(spySink.Telemetries, d => d.Type == "Azure key vault")) - .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) - .FailWith("missing Key vault dependency telemetry tracking via Arcus in spied sink"); - - DependencyTelemetry dependencyViaMicrosoftSql = - await Poll.Target(() => GetDependencyFrom(spyChannel.Telemetries, d => d.Type == "SQL")) - .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) - .FailWith("missing SQL dependency telemetry racking via Microsoft on spied channel"); - - Assert.Equal(requestViaArcusEventHubs.Context.Operation.Id, traceParent.TransactionId); - Assert.Equal(dependencyViaArcusKeyVault.Context.Operation.Id, traceParent.TransactionId); - Assert.Equal(dependencyViaMicrosoftSql.Context.Operation.Id, traceParent.TransactionId); - } - } - - [Fact] - public async Task EventHubsMessagePump_WithW3CCorrelationFormatForNewParent_AutomaticallyTracksMicrosoftDependencies() - { - // Arrange - var spySink = new InMemoryApplicationInsightsTelemetryConverter(); - var spyChannel = new InMemoryTelemetryChannel(); - - var options = new WorkerOptions(); - options.ConfigureSerilog(config => config.WriteTo.ApplicationInsights(spySink)); - - string operationName = Guid.NewGuid().ToString(); - AddEventHubsMessagePump(options, opt => opt.Routing.Telemetry.OperationName = operationName) - .WithEventHubsMessageHandler(); - - EventData eventData = CreateSensorEventDataForW3C(traceParent: null); - TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - - await using (var worker = await Worker.StartNewAsync(options)) - { - worker.Services.GetRequiredService().TelemetryChannel = spyChannel; - - // Act - await producer.ProduceAsync(eventData); - - // Assert - RequestTelemetry requestViaArcusEventHubs = - await Poll.Target(() => GetRequestFrom(spySink.Telemetries, r => r.Name == operationName)) - .Timeout(TimeSpan.FromMinutes(2)) - .FailWith("missing request telemetry with operation name in spied sink"); - - await Poll.Target(() => GetDependencyFrom(spySink.Telemetries, d => d.Type == "Azure key vault")) - .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) - .FailWith("missing Key vault dependency telemetry tracking via Arcus in spied sink"); - - await Poll.Target(() => GetDependencyFrom(spyChannel.Telemetries, d => d.Type == "SQL")) - .Until(d => d.Context.Operation.ParentId == requestViaArcusEventHubs.Id) - .FailWith("missing SQL dependency telemetry racking via Microsoft on spied channel"); - } - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs deleted file mode 100644 index 9f2f0a30..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs +++ /dev/null @@ -1,254 +0,0 @@ -using System; -using System.Runtime.CompilerServices; -using System.Text; -using System.Threading.Tasks; -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Abstractions.MessageHandling; -using Arcus.Messaging.Pumps.EventHubs.Configuration; -using Arcus.Messaging.Tests.Core.Correlation; -using Arcus.Messaging.Tests.Core.Events.v1; -using Arcus.Messaging.Tests.Core.Generators; -using Arcus.Messaging.Tests.Core.Messages.v1; -using Arcus.Messaging.Tests.Integration.Fixture; -using Arcus.Messaging.Tests.Integration.MessagePump.EventHubs; -using Arcus.Messaging.Tests.Integration.MessagePump.Fixture; -using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; -using Arcus.Testing; -using Azure.Messaging.EventHubs; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Newtonsoft.Json; -using Xunit; -using Xunit.Abstractions; - -namespace Arcus.Messaging.Tests.Integration.MessagePump -{ - [Collection("Integration")] - [Trait("Category", "Integration")] - public partial class EventHubsMessagePumpTests : IClassFixture, IAsyncLifetime - { - private readonly TestConfig _config; - private readonly EventHubsConfig _eventHubsConfig; - private readonly ILogger _logger; - private readonly ITestOutputHelper _outputWriter; - - private TemporaryBlobContainer _blobStorageContainer; - private TemporaryManagedIdentityConnection _connection; - - /// - /// Initializes a new instance of the class. - /// - public EventHubsMessagePumpTests(EventHubsEntityFixture fixture, ITestOutputHelper outputWriter) - { - _outputWriter = outputWriter; - _logger = new XunitTestLogger(outputWriter); - - _config = TestConfig.Create(); - _eventHubsConfig = _config.GetEventHubs(); - - EventHubsName = fixture.HubName; - } - - private string EventHubsName { get; } - private string FullyQualifiedEventHubsNamespace => _eventHubsConfig.HostName; - private string ContainerName => _blobStorageContainer.Name; - - /// - /// Called immediately after the class has been created, before it is used. - /// - public async Task InitializeAsync() - { - _connection = TemporaryManagedIdentityConnection.Create(_config, _logger); - _blobStorageContainer = await TemporaryBlobContainer.CreateIfNotExistsAsync(_eventHubsConfig.Storage.Name, $"test-{Guid.NewGuid()}", _logger); - } - - private EventHubsMessageHandlerCollection AddEventHubsMessagePump(WorkerOptions options, Action configureOptions = null) - { - return options.AddXunitTestLogging(_outputWriter) - .AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName: EventHubsName, - fullyQualifiedNamespace: FullyQualifiedEventHubsNamespace, - blobContainerUri: _blobStorageContainer.Client.Uri.ToString(), - clientId: _connection.ClientId, - configureOptions); - } - - private async Task TestEventHubsMessageHandlingAsync( - Action configureOptions, - MessageCorrelationFormat format = MessageCorrelationFormat.W3C, - [CallerMemberName] string memberName = null) - { - EventData message = format switch - { - MessageCorrelationFormat.W3C => CreateSensorEventDataForW3C(traceParent: TraceParent.Generate()), - MessageCorrelationFormat.Hierarchical => CreateSensorEventDataForHierarchical(), - }; - - var options = new WorkerOptions(); - configureOptions(options); - - await TestEventHubsMessageHandlingAsync(options, message, async () => - { - SensorReadEventData eventData = await DiskMessageEventConsumer.ConsumeSensorReadAsync(message.MessageId); - switch (format) - { - case MessageCorrelationFormat.W3C: - AssertReceivedSensorEventDataForW3C(message, eventData); - break; - - case MessageCorrelationFormat.Hierarchical: - AssertReceivedSensorEventDataForHierarchical(message, eventData); - break; - - default: - throw new ArgumentOutOfRangeException(nameof(format), format, null); - } - }, memberName); - } - - private async Task TestEventHubsMessageHandlingAsync( - WorkerOptions options, - EventData message, - Func assertionAsync, - [CallerMemberName] string memberName = null) - { - // Arrange - options.AddXunitTestLogging(_outputWriter); - - await using var worker = await Worker.StartNewAsync(options, memberName); - TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer(); - - // Act - await producer.ProduceAsync(message); - - // Assert - await assertionAsync(); - } - - private static EventData CreateSensorEventDataForHierarchical( - string transactionIdPropertyName = PropertyNames.TransactionId, - string operationParentIdPropertyName = PropertyNames.OperationParentId, - Encoding encoding = null) - { - var operationId = $"operation-{Guid.NewGuid()}"; - var transactionId = $"transaction-{Guid.NewGuid()}"; - var operationParentId = $"operation-parent-{Guid.NewGuid()}"; - - SensorReading reading = SensorReadingGenerator.Generate(); - EventData message = - EventDataBuilder.CreateForBody(reading, encoding ?? Encoding.UTF8) - .WithOperationId(operationId) - .WithTransactionId(transactionId, transactionIdPropertyName) - .WithOperationParentId(operationParentId, operationParentIdPropertyName) - .Build(); - - message.MessageId = reading.SensorId; - return message; - } - - private static EventData CreateSensorEventDataForW3C(Encoding encoding = null, TraceParent traceParent = null) - { - encoding ??= Encoding.UTF8; - - SensorReading reading = SensorReadingGenerator.Generate(); - string json = JsonConvert.SerializeObject(reading); - byte[] raw = encoding.GetBytes(json); - - var message = new EventData(raw) - { - MessageId = reading.SensorId, - Properties = - { - { PropertyNames.ContentType, "application/json" }, - { PropertyNames.Encoding, encoding.WebName } - } - }; - - return traceParent is null - ? message - : message.WithDiagnosticId(traceParent); - } - - private static void AssertReceivedSensorEventDataForHierarchical( - EventData message, - SensorReadEventData receivedEventData, - string transactionIdPropertyName = PropertyNames.TransactionId, - string operationParentIdPropertyName = PropertyNames.OperationParentId, - Encoding encoding = null) - { - encoding ??= Encoding.UTF8; - string json = encoding.GetString(message.EventBody.ToArray()); - - var reading = JsonConvert.DeserializeObject(json); - string operationId = message.CorrelationId; - var transactionId = message.Properties[transactionIdPropertyName].ToString(); - var operationParentId = message.Properties[operationParentIdPropertyName].ToString(); - - Assert.NotNull(receivedEventData); - Assert.NotNull(receivedEventData.CorrelationInfo); - Assert.Equal(reading.SensorId, receivedEventData.SensorId); - Assert.Equal(reading.SensorValue, receivedEventData.SensorValue); - Assert.Equal(transactionId, receivedEventData.CorrelationInfo.TransactionId); - Assert.Equal(operationId, receivedEventData.CorrelationInfo.OperationId); - Assert.Equal(operationParentId, receivedEventData.CorrelationInfo.OperationParentId); - } - - private static void AssertReceivedSensorEventDataForW3C( - EventData message, - SensorReadEventData receivedEventData) - { - var encoding = Encoding.GetEncoding(message.Properties[PropertyNames.Encoding].ToString() ?? Encoding.UTF8.WebName); - string json = encoding.GetString(message.EventBody.ToArray()); - - var reading = JsonConvert.DeserializeObject(json); - - (string transactionId, string operationParentId) = message.Properties.GetTraceParent(); - Assert.NotNull(receivedEventData); - Assert.NotNull(receivedEventData.CorrelationInfo); - Assert.Equal(reading.SensorId, receivedEventData.SensorId); - Assert.Equal(reading.SensorValue, receivedEventData.SensorValue); - Assert.Equal(transactionId, receivedEventData.CorrelationInfo.TransactionId); - Assert.NotNull(receivedEventData.CorrelationInfo.OperationId); - Assert.Equal(operationParentId, receivedEventData.CorrelationInfo.OperationParentId); - } - - private TestEventHubsMessageProducer CreateEventHubsMessageProducer() - { - return new TestEventHubsMessageProducer(EventHubsName, _eventHubsConfig); - } - - /// - /// Called when an object is no longer needed. Called just before - /// if the class also implements that. - /// - public async Task DisposeAsync() - { - if (_blobStorageContainer != null) - { - await _blobStorageContainer.DisposeAsync(); - } - - _connection?.Dispose(); - } - } - - public class EventHubsEntityFixture : IAsyncLifetime - { - private TemporaryEventHubEntity _hub; - - public string HubName { get; } = $"hub-{Guid.NewGuid()}"; - - public async Task InitializeAsync() - { - var config = TestConfig.Create(); - _hub = await TemporaryEventHubEntity.CreateAsync(HubName, config.GetEventHubs(), NullLogger.Instance); - } - - public async Task DisposeAsync() - { - await _hub.DisposeAsync(); - } - } -} diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/EventDataExtensions.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/EventDataExtensions.cs deleted file mode 100644 index 56b5f0a6..00000000 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/EventDataExtensions.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Arcus.Messaging.Tests.Core.Correlation; -using Azure.Messaging.EventHubs; - -namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture -{ - public static class EventDataExtensions - { - public static EventData WithDiagnosticId(this EventData message, TraceParent traceParent) - { - message.Properties["Diagnostic-Id"] = traceParent.DiagnosticId; - return message; - } - } -} diff --git a/src/Arcus.Messaging.Tests.Unit/Arcus.Messaging.Tests.Unit.csproj b/src/Arcus.Messaging.Tests.Unit/Arcus.Messaging.Tests.Unit.csproj index 771c8a7d..85402f58 100644 --- a/src/Arcus.Messaging.Tests.Unit/Arcus.Messaging.Tests.Unit.csproj +++ b/src/Arcus.Messaging.Tests.Unit/Arcus.Messaging.Tests.Unit.csproj @@ -20,7 +20,6 @@ - diff --git a/src/Arcus.Messaging.Tests.Unit/EventHubs/AzureEventHubsMessagePumpOptionsTests.cs b/src/Arcus.Messaging.Tests.Unit/EventHubs/AzureEventHubsMessagePumpOptionsTests.cs deleted file mode 100644 index fc57e7d4..00000000 --- a/src/Arcus.Messaging.Tests.Unit/EventHubs/AzureEventHubsMessagePumpOptionsTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -#if NET6_0 -using Arcus.Messaging.Pumps.EventHubs.Configuration; -#endif -using Xunit; - -namespace Arcus.Messaging.Tests.Unit.EventHubs -{ -#if NET6_0 - public class AzureEventHubsMessagePumpOptionsTests - { - [Theory] - [ClassData(typeof(Blanks))] - public void SetConsumerGroup_WithoutValue_Fails(string consumerGroup) - { - // Arrange - var options = new AzureEventHubsMessagePumpOptions(); - - // Act / Assert - Assert.ThrowsAny(() => options.ConsumerGroup = consumerGroup); - } - } -#endif -} diff --git a/src/Arcus.Messaging.Tests.Unit/EventHubs/IServiceCollectionExtensionTests.cs b/src/Arcus.Messaging.Tests.Unit/EventHubs/IServiceCollectionExtensionTests.cs deleted file mode 100644 index 3c477eea..00000000 --- a/src/Arcus.Messaging.Tests.Unit/EventHubs/IServiceCollectionExtensionTests.cs +++ /dev/null @@ -1,638 +0,0 @@ -using System; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Pumps.EventHubs; -using Bogus; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Moq; -using Xunit; - -namespace Arcus.Messaging.Tests.Unit.EventHubs -{ -#if NET6_0 - // ReSharper disable once InconsistentNaming - public class IServiceCollectionExtensionTests - { - private static readonly Faker Bogus = new Faker(); - - [Fact] - public void AddWithoutOptions_WithSecretStore_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - services.AddSingleton(Mock.Of()) - .AddLogging() - .AddSecretStore(stores => stores.AddInMemory()); - - // Act - services.AddEventHubsMessagePump(eventHubsName, eventHubsConnectionStringSecretName, blobStorageContainerName, storageAccountConnectionStringSecretName); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var host = provider.GetService(); - Assert.NotNull(host); - } - - [Fact] - public void AddWithoutOptions_WithCustomJobId_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - string jobId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - services.AddSingleton(Mock.Of()) - .AddLogging() - .AddSecretStore(stores => stores.AddInMemory()); - - // Act - EventHubsMessageHandlerCollection collection = - services.AddEventHubsMessagePump( - eventHubsName, eventHubsConnectionStringSecretName, blobStorageContainerName, storageAccountConnectionStringSecretName, - options => options.JobId = jobId); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var host = provider.GetService(); - Assert.NotNull(host); - Assert.Equal(jobId, collection.JobId); - } - - [Fact] - public void AddWithoutOptions_WithoutSecretStore_Fails() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - services.AddSingleton(Mock.Of()) - .AddLogging(); - - // Act - services.AddEventHubsMessagePump(eventHubsName, eventHubsConnectionStringSecretName, blobStorageContainerName, storageAccountConnectionStringSecretName); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - Assert.Throws(() => provider.GetService()); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithoutOptions_WithoutEventHubName_Fails(string eventHubsName) - { - // Arrange - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithoutOptions_WithoutEventHubsConnectionStringSecretName_Fails(string eventHubsConnectionStringSecretName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithoutOptions_WithoutBlobStorageContainerName_Fails(string blobStorageContainerName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithoutOptions_WithoutStorageAccountConnectionStringSecretName_Fails(string storageAccountConnectionStringSecretName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithOptions_WithoutEventHubName_Fails(string eventHubsName) - { - // Arrange - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithOptions_WithoutEventHubsConnectionStringSecretName_Fails(string eventHubsConnectionStringSecretName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithOptions_WithoutBlobStorageContainerName_Fails(string blobStorageContainerName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string storageAccountConnectionStringSecretName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddWithOptions_WithoutStorageAccountConnectionStringSecretName_Fails(string storageAccountConnectionStringSecretName) - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string eventHubsConnectionStringSecretName = Bogus.Lorem.Word(); - string blobStorageContainerName = Bogus.Lorem.Word(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePump( - eventHubsName, - eventHubsConnectionStringSecretName, - blobStorageContainerName, - storageAccountConnectionStringSecretName, - configureOptions: _ => { })); - } - - [Fact] - public void AddUsingManagedIdentityWithoutClientIdAndOptions_WithValidArguments_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - services.AddSingleton(new ConfigurationManager()); - - // Act - services.AddEventHubsMessagePumpUsingManagedIdentity(eventHubsName, fullyQualifiedNamespace, blobContainerUri); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - Assert.NotNull( - Assert.IsType( - Assert.Single(provider.GetServices()))); - } - - [Fact] - public void AddUsingManagedIdentityWithClientIdAndWithoutOptions_WithValidArguments_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - services.AddSingleton(new ConfigurationManager()); - - // Act - services.AddEventHubsMessagePumpUsingManagedIdentity(eventHubsName, fullyQualifiedNamespace, blobContainerUri, clientId); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - Assert.NotNull( - Assert.IsType( - Assert.Single(provider.GetServices()))); - } - - [Fact] - public void AddUsingManagedIdentityWithoutClientIdAndWithOptions_WithValidArguments_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - services.AddSingleton(new ConfigurationManager()); - string jobId = null; - - // Act - EventHubsMessageHandlerCollection collection = - services.AddEventHubsMessagePumpUsingManagedIdentity(eventHubsName, fullyQualifiedNamespace, blobContainerUri, opt => jobId = opt.JobId); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var pump = Assert.IsType( - Assert.Single(provider.GetServices())); - - Assert.NotNull(pump); - Assert.NotNull(jobId); - Assert.Equal(collection.JobId, pump.JobId); - Assert.Equal(jobId, pump.JobId); - } - - [Fact] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithValidArguments_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - services.AddSingleton(new ConfigurationManager()); - string jobId = null; - - // Act - EventHubsMessageHandlerCollection collection = - services.AddEventHubsMessagePumpUsingManagedIdentity(eventHubsName, fullyQualifiedNamespace, blobContainerUri, clientId, opt => jobId = opt.JobId); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var pump = Assert.IsType( - Assert.Single(provider.GetServices())); - - Assert.NotNull(pump); - Assert.NotNull(jobId); - Assert.Equal(collection.JobId, pump.JobId); - Assert.Equal(jobId, pump.JobId); - } - - [Fact] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithCustomJobId_Succeeds() - { - // Arrange - string eventHubsName = Bogus.Lorem.Word(); - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - services.AddSingleton(new ConfigurationManager()); - string jobId = Bogus.Random.Guid().ToString(); - - // Act - EventHubsMessageHandlerCollection collection = - services.AddEventHubsMessagePumpUsingManagedIdentity(eventHubsName, fullyQualifiedNamespace, blobContainerUri, clientId, opt => opt.JobId = jobId); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var pump = Assert.IsType( - Assert.Single(provider.GetServices())); - - Assert.NotNull(pump); - Assert.NotNull(jobId); - Assert.Equal(collection.JobId, pump.JobId); - Assert.Equal(jobId, pump.JobId); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndOptions_WithoutEventHubsName_Fails(string eventHubsName) - { - // Arrange - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndWithoutOptions_WithoutEventHubsName_Fails(string eventHubsName) - { - // Arrange - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndWithOptions_WithoutEventHubsName_Fails(string eventHubsName) - { - // Arrange - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithoutEventHubsName_Fails(string eventHubsName) - { - // Arrange - string fullyQualifiedNamespace = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndOptions_WithoutNamespace_Fails(string fullyQualifiedNamespace) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndWithoutOptions_WithoutNamespace_Fails(string fullyQualifiedNamespace) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndWithOptions_WithoutNamespace_Fails(string fullyQualifiedNamespace) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithoutNamespace_Fails(string fullyQualifiedNamespace) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string blobContainerUri = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndOptions_WithoutBlobEndpoint_Fails(string blobContainerUri) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndWithoutOptions_WithoutBlobEndpoint_Fails(string blobContainerUri) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId)); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithoutClientIdAndWithOptions_WithoutBlobEndpoint_Fails(string blobContainerUri) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - configureOptions: _ => { })); - } - - [Theory] - [ClassData(typeof(Blanks))] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithoutBlobEndpoint_Fails(string blobContainerUri) - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId, - configureOptions: _ => { })); - } - - [Fact] - public void AddUsingManagedIdentityWithoutClientIdAndOptions_WithRelativeBlobEndpoint_Fails() - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string blobContainerUri = Bogus.Internet.UrlRootedPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri)); - } - - [Fact] - public void AddUsingManagedIdentityWithClientIdAndWithoutOptions_WithRelativeBlobEndpoint_Fails() - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string blobContainerUri = Bogus.Internet.UrlRootedPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId)); - } - - [Fact] - public void AddUsingManagedIdentityWithoutClientIdAndWithOptions_WithRelativeBlobEndpoint_Fails() - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string blobContainerUri = Bogus.Internet.UrlRootedPath(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - configureOptions: _ => { })); - } - - [Fact] - public void AddUsingManagedIdentityWithClientIdAndOptions_WithRelativeBlobEndpoint_Fails() - { - // Arrange - string eventHubsName = Bogus.Lorem.Sentence(); - string fullyQualifiedNamespace = Bogus.Internet.UrlWithPath(); - string blobContainerUri = Bogus.Internet.UrlRootedPath(); - string clientId = Bogus.Random.Guid().ToString(); - var services = new ServiceCollection(); - - // Act / Assert - Assert.ThrowsAny(() => services.AddEventHubsMessagePumpUsingManagedIdentity( - eventHubsName, - fullyQualifiedNamespace, - blobContainerUri, - clientId, - configureOptions: _ => { })); - } - } -#endif -} diff --git a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj index 367527e1..10743018 100644 --- a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj +++ b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj @@ -18,7 +18,6 @@ - diff --git a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/OrderEventHubsMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/OrderEventHubsMessageHandler.cs deleted file mode 100644 index 64c0fb97..00000000 --- a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/OrderEventHubsMessageHandler.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Arcus.EventGrid.Publishing.Interfaces; -using Arcus.Messaging.Abstractions; -using Arcus.Messaging.Abstractions.EventHubs; -using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; -using Arcus.Messaging.Tests.Core.Messages.v1; -using Azure.Messaging.EventGrid; -using Microsoft.Extensions.Azure; -using Microsoft.Extensions.Logging; -using Xunit; - -namespace Arcus.Messaging.Tests.Workers.MessageHandlers -{ - public class OrderEventHubsMessageHandler : IAzureEventHubsMessageHandler - { - private readonly EventGridPublisherClient _eventGridPublisher; - private readonly IMessageCorrelationInfoAccessor _correlationAccessor; - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - public OrderEventHubsMessageHandler( - IAzureClientFactory clientFactory, - IMessageCorrelationInfoAccessor correlationAccessor, - ILogger logger) - { - _eventGridPublisher = clientFactory.CreateClient("Default"); - _correlationAccessor = correlationAccessor; - _logger = logger; - } - - /// - /// Process a new message that was received. - /// - /// The message that was received. - /// The context providing more information concerning the processing. - /// The information concerning correlation of telemetry and processes by using a variety of unique identifiers. - /// The token to cancel the processing. - /// - /// Thrown when the , , or the is null. - /// - public async Task ProcessMessageAsync( - Order message, - AzureEventHubsMessageContext messageContext, - MessageCorrelationInfo correlationInfo, - CancellationToken cancellationToken) - { - EnsureSameCorrelation(correlationInfo); - await _eventGridPublisher.PublishOrderAsync(message, correlationInfo, _logger); - } - - private void EnsureSameCorrelation(MessageCorrelationInfo correlationInfo) - { - MessageCorrelationInfo registeredCorrelation = _correlationAccessor.GetCorrelationInfo(); - Assert.NotNull(registeredCorrelation); - Assert.Equal(registeredCorrelation.OperationId, correlationInfo.OperationId); - Assert.Equal(registeredCorrelation.TransactionId, correlationInfo.TransactionId); - Assert.Equal(registeredCorrelation.OperationParentId, correlationInfo.OperationParentId); - Assert.Equal(registeredCorrelation.CycleId, correlationInfo.CycleId); - } - } -} diff --git a/src/Arcus.Messaging.sln b/src/Arcus.Messaging.sln index c79cb62d..cad78812 100644 --- a/src/Arcus.Messaging.sln +++ b/src/Arcus.Messaging.sln @@ -29,8 +29,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcus.Messaging.Abstraction EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcus.Messaging.AzureFunctions.ServiceBus", "Arcus.Messaging.AzureFunctions.ServiceBus\Arcus.Messaging.AzureFunctions.ServiceBus.csproj", "{7386C41A-6F27-42B7-BBC1-B6CB41200A68}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcus.Messaging.Pumps.EventHubs", "Arcus.Messaging.Pumps.EventHubs\Arcus.Messaging.Pumps.EventHubs.csproj", "{F5A14425-FEF5-425D-875C-2601C988E6FA}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcus.Messaging.Abstractions.EventHubs", "Arcus.Messaging.Abstractions.EventHubs\Arcus.Messaging.Abstractions.EventHubs.csproj", "{040DE2D5-0400-4B4A-A333-60C45CBB0204}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcus.Messaging.EventHubs.Core", "Arcus.Messaging.EventHubs.Core\Arcus.Messaging.EventHubs.Core.csproj", "{05EA4B3A-C619-44BB-88C4-F067FCCF83AA}" @@ -99,10 +97,6 @@ Global {7386C41A-6F27-42B7-BBC1-B6CB41200A68}.Debug|Any CPU.Build.0 = Debug|Any CPU {7386C41A-6F27-42B7-BBC1-B6CB41200A68}.Release|Any CPU.ActiveCfg = Release|Any CPU {7386C41A-6F27-42B7-BBC1-B6CB41200A68}.Release|Any CPU.Build.0 = Release|Any CPU - {F5A14425-FEF5-425D-875C-2601C988E6FA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F5A14425-FEF5-425D-875C-2601C988E6FA}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F5A14425-FEF5-425D-875C-2601C988E6FA}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F5A14425-FEF5-425D-875C-2601C988E6FA}.Release|Any CPU.Build.0 = Release|Any CPU {040DE2D5-0400-4B4A-A333-60C45CBB0204}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {040DE2D5-0400-4B4A-A333-60C45CBB0204}.Debug|Any CPU.Build.0 = Debug|Any CPU {040DE2D5-0400-4B4A-A333-60C45CBB0204}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -137,7 +131,6 @@ Global {55DE6D12-4C54-4570-BDFA-00B0FFDE5AB6} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13} {864C12DF-DE3D-421F-8687-EC3918FFB8BE} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1} {7386C41A-6F27-42B7-BBC1-B6CB41200A68} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1} - {F5A14425-FEF5-425D-875C-2601C988E6FA} = {31250C96-0F22-482F-B3D7-127898A28CF3} {040DE2D5-0400-4B4A-A333-60C45CBB0204} = {31250C96-0F22-482F-B3D7-127898A28CF3} {05EA4B3A-C619-44BB-88C4-F067FCCF83AA} = {31250C96-0F22-482F-B3D7-127898A28CF3} {F1B24FD4-099E-489D-B45D-A1A992CA5C3A} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13} From cf326435d1dcad4b59ee8a66e9cda9707e5d71f5 Mon Sep 17 00:00:00 2001 From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com> Date: Thu, 13 Feb 2025 06:48:02 +0100 Subject: [PATCH 2/3] chore(remove): empty event-hubs folder in integration project --- .../Arcus.Messaging.Tests.Integration.csproj | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj index eab1d0f1..b0421224 100644 --- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj +++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj @@ -49,8 +49,4 @@ - - - - From 3c84f90a57d4d818a5020ec41200b4e1fc4d5b0c Mon Sep 17 00:00:00 2001 From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com> Date: Thu, 13 Feb 2025 06:53:35 +0100 Subject: [PATCH 3/3] docs(remove): mentioning of event-hubs message pump in extensions --- docs/preview/02-Features/05-event-hubs-extensions.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/preview/02-Features/05-event-hubs-extensions.md b/docs/preview/02-Features/05-event-hubs-extensions.md index 6cd4fa52..cdfd9ede 100644 --- a/docs/preview/02-Features/05-event-hubs-extensions.md +++ b/docs/preview/02-Features/05-event-hubs-extensions.md @@ -55,8 +55,6 @@ The Arcus message pump/router automatically makes sure that received Azure Event If you also want the sender (dependency tracking) to be linked to the request, we provide a set of easy extensions on the `EventHubProducerClient` to make this happen. For more information on dependency tracking, see the [Arcus Observability feature documentation on telemetry tracking](https://observability.arcus-azure.net/features/writing-different-telemetry-types/). -> 🚩 By default, the EventHubs message pump is using W3C correlation, not Hierarchical, which already allows automatic dependency tracking upon sending without additional configuration. If you want to use Hierarchical, please configure the correlation format in the [message pump configuration](./02-message-handling/03-event-hubs.md). - Internally, we enrich the `EventData` with the message correlation and track the entire operation as an Azure EventHubs dependency. The result of this operation will result in a parent-child relationship between the dependency-request.