diff --git a/docs/versioned_docs/version-v2.1.0/01-index.md b/docs/versioned_docs/version-v2.1.0/01-index.md new file mode 100644 index 00000000..eb8c7ebb --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/01-index.md @@ -0,0 +1,24 @@ +--- +title: "Arcus - Messaging" +layout: default +slug: / +sidebar_label: Welcome +--- + +# Introduction +Arcus Messaging is a library that helps with the integration of messaging systems like queues or event subscriptions and allows you to focus on implementing business logic when processing messages instead of spending time with message peaking, connections, deserialization, and other infrastructure code that takes up time. + +## Guides +* Migrate from v0.x to v1.0 ([docs](./03-Guides/migration-guide-v1.0.md)) + +# Installation +The packages are available on NuGet, for example: + +```shell +PM > Install-Package Arcus.Messaging.Pumps.ServiceBus +``` + +# License +This is licensed under The MIT License (MIT). Which means that you can use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the web application. But you always need to state that Codit is the original author of this web application. + +*[Full license here](https://github.com/arcus-azure/arcus.messaging/blob/master/LICENSE)* diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/01-service-bus.md b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/01-service-bus.md new file mode 100644 index 00000000..a49c19b0 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/01-service-bus.md @@ -0,0 +1,636 @@ +--- +title: "Azure Service Bus message handling" +layout: default +--- + +# Azure Service Bus message handling +The `Arcus.Messaging.Pumps.ServiceBus` library provides ways to perform all the plumbing that is required for processing messages on queues and topic subscriptions. + +As a user, the only thing you have to do is **focus on processing messages, not how to get them**. Following terms are used: +- **Message handler**: implementation that processes the received message from an Azure Service Bus queue or topic subscription. Message handlers are created by implementing the `IAzureServiceBusMessageHandler`. This message handler will be called upon when a message is available in the Azure Service Bus queue or on the topic subscription. [this section](#message-handler-example) for a message handler example setup +- **Message router**: implementation that delegates the received Azure Service Bus message to the correct message handler. For alternative message routing, see [this section](#alternative-service-bus-message-routing) for more information. +- **Message pump**: implementation that interacts and receives the Azure Service Bus message. The pump can be configured for different scenarios, see [this section](#pump-configuration) for more information. + +![Message handling schema](/media/worker-message-handling.png) + +## Installation +This features requires to install our NuGet package: + +```shell +PM > Install-Package Arcus.Messaging.Pumps.ServiceBus +``` + +> ⚠ The new Azure SDK doesn't yet support Azure Service Bus plugins. See this [migration guide](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging.ServiceBus/MigrationGuide.md#known-gaps-from-previous-library) for more info on this topic. + +## Message handler example +Here is an example of a message handler that expects messages of type `Order`: + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.ServiceBus; +using Microsoft.Extensions.Logging; + +public class OrdersMessageHandler : IAzureServiceBusMessageHandler +{ + private readonly ILogger _logger; + + public OrdersMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + Order orderMessage, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + _logger.LogInformation("Processing order {OrderId} for {OrderAmount} units of {OrderArticle} bought by {CustomerFirstName} {CustomerLastName}", orderMessage.Id, orderMessage.Amount, orderMessage.ArticleNumber, orderMessage.Customer.FirstName, orderMessage.Customer.LastName); + + // Process the message. + + _logger.LogInformation("Order {OrderId} processed", orderMessage.Id); + } +} +``` + +## Message handler registration +Once the message handler is created, you can very easily register it: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Add Service Bus Queue message pump and use OrdersMessageHandler to process the messages + // - ISecretProvider will be used to lookup the connection string scoped to the queue for secret ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING + services.AddServiceBusQueueMessagePump("ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING") + .WithServiceBusMessageHandler(); + + // Add Service Bus Topic message pump and use OrdersMessageHandler to process the messages on the 'My-Subscription-Name' subscription + // - Topic subscriptions over 50 characters will be truncated + // - ISecretProvider will be used to lookup the connection string scoped to the queue for secret ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING + services.AddServiceBusTopicMessagePump("My-Subscription-Name", "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING") + .WithServiceBusMessageHandler(); + + // Note, that only a single call to the `.WithServiceBusMessageHandler` has to be made when the handler should be used across message pumps. + } +} +``` + +In this example, we are using the Azure Service Bus message pump to process a queue and a topic and use the connection string stored in the `ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING` connection string. + +> 💡 We support **connection strings that are scoped on the Service Bus namespace and entity** allowing you to choose the required security model for your applications. If you are using namespace-scoped connection strings you'll have to pass your queue/topic name as well. + +> ⚠ The order in which the message handlers are registered matters when a message is processed. If the first one can't handle the message, the second will be checked, and so forth. + +### Filter messages based on message context +When registering a new message handler, one can opt-in to add a filter on the message context which filters out messages that are not needed to be processed. + +This can be useful when you are sending different message types on the same queue. Another use-case is being able to handle different versions of the same message type which have different contracts because you are migrating your application. + +Following example shows how a message handler should only process a certain message when a property in the context has a specific value. + +We'll use a simple message handler implementation: + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessagingHandling; + +public class OrderMessageHandler : IAzureServiceBusMessageHandler +{ + public async Task ProcessMessageAsync(Order order, AzureServiceBusMessageContext context, ...) + { + // Do some processing... + } +} +``` + +We would like that this handler only processed the message when the context contains `MessageType` equals `Order`. + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusTopicMessagePump(...) + .WithServiceBusMessageHandler(context => context.Properties["MessageType"].ToString() == "Order"); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) than that handler may be chosen instead of the one with the specific filter. + + +### Bring your own deserialization +You can also choose to extend the built-in message deserialization with a custom deserializer to meet your needs. +This allows you to easily deserialize into different message formats or reuse existing (de)serialization capabilities that you already have without altering the message router. + +You start by implementing an `IMessageBodySerializer`. The following example shows how an expected type can be transformed to something else. +The result type (in this case `OrderBatch`) will then be used to check if there is an `IAzureServiceBusMessageHandler` registered for that message type. + +```csharp +using Arcus.Messaging.Abstractions.MessageHandling; + +public class OrderBatchMessageBodySerializer : IMessageBodySerializer +{ + public async Task DeserializeMessageAsync(string messageBody) + { + var serializer = new XmlSerializer(typeof(Order[])); + using (var contents = new MemoryStream(Encoding.UTF8.GetBytes(messageBody))) + { + var orders = (Order[]) serializer.Deserialize(contents); + return MessageResult.Success(new OrderBatch(orders)); + } + } +} +``` + +The registration of these message body serializers can be done just as easily as an `IAzureServiceBusMessageHandler`: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Register the message body serializer in the dependency container where the dependent services will be injected. + services.AddServiceBusTopicMessagePump(...) + .WitServiceBusMessageHandler(..., messageBodySerializer: new OrderBatchMessageBodySerializer()); + + // Register the message body serializer in the dependency container where the dependent services are manually injected. + services.AddServiceBusTopicMessagePump(...) + .WithServiceBusMessageHandler(..., messageBodySerializerImplementationFactory: serviceProvider => + { + var logger = serviceProvider.GetService>(); + return new OrderBatchMessageHandler(logger); + }); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) than that handler may be chosen instead of the one with the specific filter. + +### Filter messages based on message body +When registering a new message handler, one can opt-in to add a filter on the incoming message body which filters out messages that are not needed to be processed by this message handler. +This can be useful when you want to route messages based on the message content itself instead of the messaging context. + +Following example shows how a message handler should only process a certain message when the status is 'Sales'; meaning only `Order` for the sales division will be processed. + +```csharp +// Message to be sent: +public enum Department { Sales, Marketing, Operations } + +public class Order +{ + public string Id { get; set; } + public Department Type { get; set; } +} + +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; + +// Message handler +public class OrderMessageHandler : IAzureServiceBusMessageHandler +{ + public async Task ProcessMessageAsync(Order order, AzureServiceBusMessageContext context, ...) + { + // Do some processing... + } +} + +using Microsoft.Extensions.DependencyInjection; + +// Message handler registration +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusTopicMessagePump(...) + .WithServiceMessageHandler((Order order) => order.Type == Department.Sales); + } +} +``` + +### Fallback message handling +When receiving a message on the message pump and none of the registered `IAzureServiceBusMessageHandler`'s can correctly process the message, the message pump normally throws and logs an exception. +It could also happen in a scenario that's to be expected that some received messages will not be processed correctly (or you don't want them to). + +In such a scenario, you can choose to register a `IAzureServiceBusFallbackMessageHandler` in the dependency container. +This extra message handler will then process the remaining messages that can't be processed by the normal message handlers. + +Following example shows how such a message handler can be implemented: + +```csharp +using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Microsoft.Azure.ServiceBus; +using Microsoft.Extensions.Logging; + +public class WarnsUserFallbackMessageHandler : IAzureServiceBusFallbackMessageHandler +{ + private readonly ILogger _logger; + + public WarnsUserFallbackMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync(Message message, AzureServiceBusMessageContext context, ...) + { + _logger.LogWarning("These type of messages are expected not to be processed"); + } +} +``` + +> 💡 Note that you have access to the Azure Service Bus message and the specific message context. These can be used to eventually call `.Abandon()` on the message. See [this section](#influence-handling-of-service-bus-message-in-message-handler) for more information. + +And to register such an implementation: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusQueueMessagePump(...) + .WithServiceBusFallbackMessageHandler(); + } +} +``` + +## Influence handling of Service Bus message in message handler +When an Azure Service Bus message is received (either via regular message handlers or fallback message handlers), we allow specific Azure Service Bus operations during the message handling. +Currently we support [**Dead letter**](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues) and [**Abandon**](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.messagereceiver.abandon?view=azure-dotnet). + +### During (regular) message handling +To have access to the Azure Service Bus operations, you have to implement the `abstract` `AzureServiceBusMessageHandler` class. +Behind the screens it implements the `IMessageHandler<>` interface, so you can register this the same way as your other regular message handlers. + +This base class provides several protected methods to call the Azure Service Bus operations: +- `.CompleteMessageAsync` +- `.DeadLetterMessageAsync` +- `.AbandonMessageAsync` + +Example: + +```csharp +using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Microsoft.Extensions.Logging; + +public class AbandonsUnknownOrderMessageHandler : AzureServiceBusMessageHandler +{ + public AbandonsUnknownOrderMessageHandler(ILogger logger) + : base(logger) + { + } + + public override async Task ProcessMessageAsync(Order order, AzureServiceBusMessageContext context, ...) + { + if (order.Id < 1) + { + await AbandonMessageAsync(); + } + else + { + Logger.LogInformation("Received valid order"); + } + } +} +``` + +The registration happens the same as any other regular message handler: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusQueueMessagePump(...) + .WithServiceBusMessageHandler(); + } +} +``` + +### During fallback message handling +To have access to the Azure Service Bus operations, you have to implement the abstract `AzureServiceBusFallbackMessageHandler` class. +Behind the scenes it implements the `IServiceBusFallbackMessageHandler`, so you can register this the same way as any other fallback message handler. + +This base class provides several protected methods to call the Azure Service Bus operations: +- `.CompleteAsync` +- `.DeadLetterAsync` +- `.AbandonAsync` + +Example: + +```csharp +using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.ServiceBus.Abstractions.MessageHandling; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; + +public class DeadLetterFallbackMessageHandler : AzureServiceBusFallbackMessageHandler +{ + public DeadLetterFallbackMessageHandler(ILogger logger) + : base(logger) + { + } + + public override async Task ProcessMessageAsync(ServiceBusReceivedMessage message, AzureServiceBusMessageContext context, ...) + { + Logger.LogInformation("Message is not handled by any message handler, will dead letter"); + await DeadLetterMessageAsync(message); + } +} +``` + +The registration happens the same way as any other fallback message handler: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusQueueMessagePump(...) + .WithServiceBusFallbackMessageHandler(); + } +} +``` + +## Pump Configuration +Next to that, we provide a **variety of overloads** to allow you to: +- Specify the name of the queue/topic +- Only provide a prefix for the topic subscription, so each topic message pump is handling messages on separate subscriptions +- Configure how the message pump should work *(ie. max concurrent calls & auto delete)* +- Read the connection string from the configuration *(although we highly recommend using the [Arcus secret store](https://security.arcus-azure.net/features/secret-store) instead)* + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Specify the name of the Service Bus Queue: + services.AddServiceBusQueueMessagePump( + "My-Service-Bus-Queue-Name", + "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING"); + + // Specify the name of the Service Bus Topic, and provide a name for the Topic subscription: + services.AddServiceBusMessageTopicMessagePump( + "My-Service-Bus-Topic-Name", + "My-Service-Bus-Topic-Subscription-Name", + "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING"); + + // Specify a topic subscription prefix instead of a name to separate topic message pumps. + services.AddServiceBusTopicMessagePumpWithPrefix( + "My-Service-Bus-Topic-Name" + "My-Service-Bus-Subscription-Prefix", + "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING"); + + // Uses managed identity to authenticate with the Service Bus Topic: + services.AddServiceBusTopicMessagePumpUsingManagedIdentity( + topicName: properties.EntityPath, + subscriptionName: "Receive-All", + fullyQualifiedNamespace: ".servicebus.windows.net" + // The optional client id to authenticate for a user assigned managed identity. More information on user assigned managed identities cam be found here: + // https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview#how-a-user-assigned-managed-identity-works-with-an-azure-vm + clientId: ""); + + services.AddServiceBusTopicMessagePump( + "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING", + options => + { + // Indicate whether or not messages should be automatically marked as completed + // if no exceptions occurred and processing has finished (default: true). + options.AutoComplete = true; + + // Indicate whether or not the message pump should emit security events (default: false). + options.EmitSecurityEvents = true; + + // The amount of concurrent calls to process messages + // (default: null, leading to the defaults of the Azure Service Bus SDK message handler options). + options.MaxConcurrentCalls = 5; + + // Specifies the amount of messages that will be eagerly requested during processing. + // Setting the PrefetchCount to a value higher then the MaxConcurrentCalls value helps maximizing + // throughput by allowing the message pump to receive from a local cache rather then waiting on a + // service request. + options.PrefetchCount = 10; + + // The unique identifier for this background job to distinguish + // this job instance in a multi-instance deployment (default: guid). + options.JobId = Guid.NewGuid().ToString(); + + // The format of the message correlation used when receiving Azure Service Bus messages. + // (default: W3C). + options.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; + + // The name of the operation used when tracking the request telemetry. + // (default: Process) + options.Routing.Correlation.OperationName = "Order"; + + // The name of the Azure Service Bus message property that has the transaction ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: Transaction-Id). + options.Routing.Correlation.TransactionIdPropertyName = "X-Transaction-ID"; + + // The name of the Azure Service Bus message property that has the upstream service ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: Operation-Parent-Id). + options.Routing.Correlation.OperationParentIdPropertyName = "X-Operation-Parent-ID"; + + // The property name to enrich the log event with the correlation information cycle ID. + // (default: CycleId) + options.Routing.CorrelationEnricher.CycleIdPropertyName = "X-CycleId"; + + // Indicate whether or not the default built-in JSON deserialization should ignore additional members + // when deserializing the incoming message (default: AdditionalMemberHandling.Error). + options.Routing.Deserialization.AdditionalMembers = AdditionalMemberHandling.Ignore; + + // Indicate whether or not a new Azure Service Bus Topic subscription should be created/deleted + // when the message pump starts/stops (default: None, so no subscription will be created or deleted). + options.TopicSubscription = TopicSubscription.Automatic; + }); + + services.AddServiceBusQueueMessagePump( + "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING", + options => + { + // Indicate whether or not messages should be automatically marked as completed + // if no exceptions occurred and processing has finished (default: true). + options.AutoComplete = true; + + // Indicate whether or not the message pump should emit security events (default: false). + options.EmitSecurityEvents = true; + + // The amount of concurrent calls to process messages + // (default: null, leading to the defaults of the Azure Service Bus SDK message handler options). + options.MaxConcurrentCalls = 5; + + // Specifies the amount of messages that will be eagerly requested during processing. + // Setting the PrefetchCount to a value higher then the MaxConcurrentCalls value helps maximizing + // throughput by allowing the message pump to receive from a local cache rather then waiting on a + // service request. + options.PrefetchCount = 10; + + // The unique identifier for this background job to distinguish + // this job instance in a multi-instance deployment (default: guid). + options.JobId = Guid.NewGuid().ToString(); + + // The format of the message correlation used when receiving Azure Service Bus messages. + // (default: W3C). + options.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; + + // The name of the operation used when tracking the request telemetry. + // (default: Process) + options.Routing.Correlation.OperationName = "Order"; + + // The name of the Azure Service Bus message property that has the transaction ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: Transaction-Id). + options.Routing.Correlation.TransactionIdPropertyName = "X-Transaction-ID"; + + // The name of the Azure Service Bus message property that has the upstream service ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: Operation-Parent-Id). + options.Routing.Correlation.OperationParentIdPropertyName = "X-Operation-Parent-ID"; + + // The property name to enrich the log event with the correlation information cycle ID. + // (default: CycleId) + options.Routing.CorrelationEnricher.CycleIdPropertyName = "X-CycleId"; + + // Indicate whether or not the default built-in JSON deserialization should ignore additional members + // when deserializing the incoming message (default: AdditionalMemberHandling.Error). + options.Routing.Deserialization.AdditionalMembers = AdditionalMembersHandling.Ignore; + }); + + // Uses managed identity to authenticate with the Service Bus Topic: + services.AddServiceBusQueueMessagePumpUsingManagedIdentity( + queueName: "orders", + serviceBusNamespace: "" + // The optional client id to authenticate for a user assigned managed identity. More information on user assigned managed identities cam be found here: + // https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview#how-a-user-assigned-managed-identity-works-with-an-azure-vm + clientId: ""); + + // Multiple message handlers can be added to the services, based on the message type (ex. 'Order', 'Customer'...), + // the correct message handler will be selected. + services.AddServiceBusQueueMessagePump(...) + .WithServiceBusMessageHandler() + .WithMessageHandler(); + } +} +``` + +## Alternative Service Bus message routing +By default, when registering the Azure Service Bus message pump a built-in message router is registered to handle the routing throughout the previously registered message handlers. + +This router is registered with the `IAzureServiceBusMessageRouter` interface (which implements the more general `IMessageRouter` for non-specific Service Bus messages). + +When you want for some reason alter the message routing or provide additional functionality, you can register your own router which the Azure Service Bus message pump will use instead. + +The following example shows you how a custom router is used for additional tracking. Note that the `AzureServiceBusMessageRouter` implements the `IAzureServiceBusMessageRouter` so we can override the necessary implementations. + +```csharp +public class TrackedAzureServiceBusMessageRouter : AzureServiceBusMessageRouter +{ + public TrackedAzureServiceBusMessageRouter(IServiceProvider serviceProvider, ILogger logger) + : base(serviceProvider, logger) + { + } + + public override Task ProcessMessageAsync( + ServiceBusReceivedMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + Logger.LogTrace("Start routing incoming message..."); + base.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); + Logger.LogTrace("Done routing incoming message!"); + } +} +``` + +This custom message router can be registered with the following extension: + +```csharp +public void ConfigureServices(IServiceCollection services) +{ + services.AddServiceBusMessageRouting(serviceProvider => + { + var logger = serviceProvider.GetService>(); + return new TrackedAzureServiceBusMessageRouter(serviceProvider, logger); + }); + + services.AddServiceBusQueueMessagePump(...); +} +``` + +> Note that your own router should be registered **before** you register the Azure Message Pump otherwise it cannot be overridden. + +## Message Correlation +The message correlation of the received messages is set automatically. All the message handlers will have access to the current `MessageCorrelationInfo` correlation model for the specific currently processed message. + +To retrieve the correlation information in other application code, you can use a dedicated marker interface called `IMessageCorrelationInfoAccessor`. +Note that this is a scoped dependency and so will be the same instance across a scoped operation. + +```csharp +using Arcus.Messaging.Abstractions; + +public class DependencyService +{ + private readonly IMessageCorrelationInfoAccessor _accessor; + + public DependencyService(IMessageCorrelationInfoAccessor accessor) + { + _accessor = accessor; + } + + public void Method() + { + MessageCorrelationInfo correlation = _accessor.GetCorrelationInfo(); + + _accessor.SetCorrelation(correlation); + } +} +``` + +### Hierarchical correlation +When using the Hierarchical correlation system, the correlation information of Azure Service Bus messages can also be retrieved from an extension that wraps all correlation information. + +```csharp +using Arcus.Messaging.Abstractions; +using Azure.Messaging.ServiceBus; + +ServiceBusReceivedMessage message = ... +MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + +// Unique identifier that indicates an attempt to process a given message. +string cycleId = correlationInfo.CycleId; + +// Unique identifier that relates different requests together. +string transactionId = correlationInfo.TransactionId; + +// Unique identifier that distinguishes the request. +string operationId = correlationInfo.OperationId; +``` + +## Want to get started easy? Use our templates! +We provide templates to get started easily: + +- Azure Service Bus Queue Worker Template ([docs](https://templates.arcus-azure.net/features/servicebus-queue-worker-template)) +- Azure Service Bus Topic Worker Template ([docs](https://templates.arcus-azure.net/features/servicebus-topic-worker-template)) \ No newline at end of file diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/02-service-bus-azure-functions.md b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/02-service-bus-azure-functions.md new file mode 100644 index 00000000..4664a990 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/02-service-bus-azure-functions.md @@ -0,0 +1,232 @@ +--- +title: "Azure Service Bus message handling for Azure Functions" +layout: default +--- + +This article describes how you can use Arcus' message handler concept with Azure Functions; allowing you to more easily port your business logic from/to Azure Functions. + +# Azure Service Bus message handling for Azure Functions +While our default message pump system provides a way to receive, route, and handle incoming Service Bus messages which are, unfortunately, not supported in Azure Functions. +Today, Azure Functions acts as a message receiver meaning that the function is triggered when a message is available but does not handle message routing and handling. However, in this case, it acts as the message pump. + +Following terms are used: +- **Message handler**: implementation that processes the received message from an Azure Service Bus queue or topic subscription. Message handlers are created by implementing the `IAzureServiceBusMessageHandler`. This message handler will be called upon when a message is available in the Azure Service Bus queue or on the topic subscription. +- **Message router**: implementation that delegates the received Azure Service Bus message to the correct message handler. + +That's why we extracted our message routing functionality so you can call it directly from your Azure Function. + +![Azure Functions message handling](/media/az-func-message-handling.png) + +We will walk you through the process of using message handlers with Azure Functions: + +## Installation +To use the following described features, install the following package: +```shell +PM > Install-Package -Name Arcus.Messaging.AzureFunctions.ServiceBus +``` + +## Receive Azure Service Bus message in an Azure Function +Here's an example of how an Azure Function receives an Azure Service Bus message from a topic: + +```csharp +public class MessageProcessingFunction +{ + [FunctionName("message-processor")] + public void Run( + [ServiceBusTrigger("%TopicName%", "%SubscriptionName%", Connection = "ServiceBusConnectionString")] Message message, + ILogger log) + { + // Processing message... + } +} +``` + +## Declaring our Azure Service Bus message handlers +Registering message handlers to process the Service Bus message happens just the same as using a message pump. +Here is an example of two message handlers that are being registered during startup: + +Processing shipment messages: + +```csharp +public class ShipmentServiceBusMessageHandler : IAzureServiceBusMessageHandler +{ + private readonly ILogger _logger; + + public ShipmentServiceBusMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + Shipment shipment, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + _logger.LogInformation("Processing shipment {ShipmentId} for order #{OrderId}", shipment.Id, shipment.Order.Id); + } +} +``` + +Processing order messages: + +```csharp +public class OrderServiceBusMessageHandler : IAzureServiceBusMessageHandler +{ + private readonly ILogger _logger; + + public OrderServiceBusMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + Order order, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + _logger.LogInformation("Processing order {OrderId} for {OrderAmount} units of {OrderArticle} bought by {CustomerFirstName} {CustomerLastName}", order.Id, order.Amount, order.ArticleNumber, order.Customer. + } +} +``` + +Now that we have created our message handlers, we can declare when we should use them by registering them with our router. + +## Processing received messages through the message router +Now that everything is setup, we need to actually use the declared message handlers by routing the messages from the Azure Function into the correct message handler. + +To achieve that, we need to add message routing with the `.AddServiceBusMessageRouting` extension: + +### Isolated Azure Functions +```csharp +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +var host = new HostBuilder() + .ConfigureFunctionsWorkerDefaults(builder => + { + builder.Services.AddServiceBusMessageRouting() + .WithServiceBusMessageHandler() + .WithServiceBusMessageHandler(); + }) + .Build(); +host.Run(); +``` + +### In-Process Azure Functions +```csharp +[assembly: FunctionsStartup(typeof(Startup))] +namespace MessageProcessing +{ + public class Startup : FunctionsStartup + { + public override void Configure(IFunctionsHostBuilder builder) + { + builder.AddServiceBusMessageRouting() + .WithServiceBusMessageHandler() + .WithServiceBusMessageHandler(); + } + } +} +``` + +This extension will register an `IAzureServiceBusMessageRouter` interface allows you access to message handling with specific Service Bus operations during the message processing (like dead lettering and abandoning). + +> ⚡ It also registers an more general `IMessageRouter` you can use if the general message routing (with the message raw message body as `string` as incoming message) will suffice. + +We can now inject the message router in our Azure Function and process all messages with it. +This will determine what the matching message handler is and process it accordingly. +Upon receival of an Azure Service Bus message, the message will be either routed to one of the two previously registered message handlers. + +### Isolated Azure Functions +```csharp +using Arcus.Messaging.Abstractions.ServiceBus; +using Arcus.Messaging.AzureFunctions.ServiceBus; +using Azure.Messaging.ServiceBus; + +public class MessageProcessingFunction +{ + private readonly IAzureServiceBusMessageRouter _messageRouter; + private readonly string _jobId; + + public MessageProcessingFunction(IAzureServiceBusMessageRouter messageRouter) + { + _jobId = $"job-{Guid.NewGuid()}"; + _messageRouter = messageRouter; + } + + [Function("message-processor")] + public void Run( + [ServiceBusTrigger("%TopicName%", "%SubscriptionName%", Connection = "ServiceBusConnectionString")] byte[] messageBody, + FunctionContext executionContext) + { + ServiceBusReceivedMessage message = ConvertToServiceBusMessage(messageBody, executionContext); + + AzureServiceBusMessageContext messageContext = message.GetMessageContext(_jobId); + using (MessageCorrelationResult result = executionContext.GetCorrelationInfo()) + { + _messageRouter.ProcessMessageAsync(message, messageContext, result.CorrelationInfo, cancellationToken); + } + } + + private static ServiceBusReceivedMessage ConvertToServiceBusMessage(byte[] messageBody, FunctionContext context) + { + var applicationProperties = new Dictionary(); + if (context.BindingContext.BindingData.TryGetValue("ApplicationProperties", out object applicationPropertiesObj)) + { + var json = applicationPropertiesObj.ToString(); + applicationProperties = JsonSerializer.Deserialize>(json); + } + + var message = ServiceBusModelFactory.ServiceBusReceivedMessage( + body: BinaryData.FromBytes(messageBody), + messageId: context.BindingContext.BindingData["MessageId"]?.ToString(), + correlationId: context.BindingContext.BindingData["CorrelationId"]?.ToString(), + properties: applicationProperties); + + return message; + } +} +``` + +### In-Process Azure Functions +```csharp +using Arcus.Messaging.Abstractions.ServiceBus; +using Azure.Messaging.ServiceBus; + +public class MessageProcessingFunction +{ + private readonly IAzureServiceBusMessageRouter _messageRouter; + private readonly AzureFunctionsInProcessMessageCorrelation _messageCorrelation; + private readonly string _jobId; + + public MessageProcessingFunction( + IAzureServiceBusMessageRouter messageRouter, + AzureFunctionsInProcessMessageCorrelation messageCorrelation) + { + _jobId = $"job-{Guid.NewGuid()}"; + _messageRouter = messageRouter; + _messageCorrelation = messageCorrelation; + } + + [FunctionName("message-processor")] + public void Run( + [ServiceBusTrigger("%TopicName%", "%SubscriptionName%", Connection = "ServiceBusConnectionString")] ServiceBusReceivedMessage message, + ILogger log, + CancellationToken cancellationToken) + { + AzureServiceBusMessageContext messageContext = message.GetMessageContext(_jobId); + + // W3C message correlation (with automatic tracking of built-in Microsoft dependencies, recommended) + using (MessageCorrelationResult result = _messageCorrelation.CorrelateMessage(message)) + { + _messageRouter.ProcessMessageAsync(message, messageContext, result.CorrelationInfo, cancellationToken); + } + + // Hierarchical message correlation (without automatic tracking of built-in Microsoft dependencies)) + MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + + _messageRouter.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); + } +} +``` \ No newline at end of file diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/03-event-hubs.md b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/03-event-hubs.md new file mode 100644 index 00000000..a5b6cbc2 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/03-event-hubs.md @@ -0,0 +1,345 @@ +--- +title: "Azure Event Hubs message handling" +layout: default +--- + +# Azure Event Hubs message handling +The `Arcus.Messaging.Pumps.EventHubs` library provides ways to perform all the plumbing that is required for processing event messages on Azure EventHubs. + +As a user, the only thing you have to do is **focus on processing messages, not how to get them**. Following terms are used: +- **Message handler**: implementation that processes the received message from an Azure EventHubs. Message handlers are created by implementing the `IAzureEventHubsMessageHandler`. This message handler will be called upon when a message is available in the Azure EventHubs. [this section](#message-handler-example) for a message handler example setup +- **Message router**: implementation that delegates the received Azure EventHubs event message to the correct message handler. +- **Message pump**: implementation that interacts and receives the Azure EventHubs event message. The pump can be configured for different scenarios, see [this section](#pump-configuration) for more information. + +![Message handling schema](/media/worker-eventhubs-message-handling.png) + +## Installation +This features requires to install our NuGet package: + +```shell +PM > Install-Package Arcus.Messaging.Pumps.EventHubs +``` + +## Message handler example +Here is an example of a message handler that expects messages of type `SensorReading`: + +> ⚡ You can use the same message handlers in an a .NET Worker message pump and an Azure Functions scenario. + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.EventHubs; +using Microsoft.Extensions.Logging; + +public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler +{ + private readonly ILogger _logger; + + public SensorReadingMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + SensorReading message, + AzureEventHubsMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + _logger.LogInformation("Processing sensor reading {SensorId} for ", message.Id); + + // Process the message. + + _logger.LogInformation("Sensor reading {SensorId} processed", message.Id); + } +} +``` + +## Message handler registration +Once the message handler is created, you can very easily register it: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + // Add Azure EventHubs message pump and use OrdersMessageHandler to process the messages. + services.AddEventHubsMessagePumpUsingManagedIdentity("", "", "") + .WithEventHubsMessageHandler(); + + // Note, that only a single call to the `.WithEventHubsMessageHandler` has to be made when the handler should be used across message pumps. + } +} +``` + +The Azure EventHubs uses the `EventProcessorClient` internally. To learn more about this way of consuming messages from Azure EventHubs, see [Microsoft's official documentation](https://docs.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventhubs.processor-readme). + +In this example, we are using the Azure EventHubs message pump to process event messages and use the connection strings stored in the Arcus secret store: +- Azure EventHubs name: The name of the Event Hub that the processor is connected to, specific to the EventHubs namespace that contains it. +- Azure EventHubs connection string secret name: The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store implementation. +- Azure EventHubs Blob storage container name: The name of the Azure Blob storage container in the storage account to reference where the event checkpoints will be stored. The events will be streamed to this storage so that the client only has to worry about event processing, not event capturing. +- Azure EventHubs Blob storage account connection string secret name: The name of the secret to retrieve the Azure EventHubs connection string using your registered Arcus secret store implementation. + +> ⚠ The order in which the message handlers are registered matters when a message is processed. If the first one can't handle the message, the second will be checked, and so forth. + +### Filter messages based on message context +When registering a new message handler, one can opt-in to add a filter on the message context which filters out messages that are not needed to be processed. + +This can be useful when you are sending different message types on the same queue. Another use-case is being able to handle different versions of the same message type which have different contracts because you are migrating your application. + +Following example shows how a message handler should only process a certain message when a property's in the context is present. + +We'll use a simple message handler implementation: + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessagingHandling; + +public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler +{ + public async Task ProcessMessageAsync(SensorReading message, AzureEventHubsMessageContext context, ...) + { + // Do some processing... + } +} +``` + +We would like that this handler only processed the message when the context contains `Location` equals `Room`. + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddEventHubsMessagePumpUsingManagedIdentity(...) + .WithEventHubsMessageHandler(context => context.Properties["Location"].ToString() == "Room"); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) then that handler may be chosen instead of the one with the specific filter. + +### Bring your own deserialization +You can also choose to extend the built-in message deserialization with a custom deserializer to meet your needs. +This allows you to easily deserialize into different message formats or reuse existing (de)serialization capabilities that you already have without altering the message router. + +You start by implementing an `IMessageBodySerializer`. The following example shows how an expected type can be transformed to something else. +The result type (in this case `SensorReadingBatch`) will then be used to check if there is an `IAzureEventHubsMessageHandler` registered for that message type. + +```csharp +using Arcus.Messaging.Pumps.Abstractions.MessageHandling; + +public class SensorReadingBatchMessageBodySerializer : IMessageBodySerializer +{ + public async Task DeserializeMessageAsync(string messageBody) + { + var serializer = new XmlSerializer(typeof(SensorReading[])); + using (var contents = new MemoryStream(Encoding.UTF8.GetBytes(messageBody))) + { + var messages = (SensorReading[]) serializer.Deserialize(contents); + return MessageResult.Success(new SensorReadingBatch(messages)); + } + } +} +``` + +The registration of these message body serializers can be done just as easily as an `IAzureEventHubsMessageHandler`: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + // Register the message body serializer in the dependency container where the dependent services will be injected. + services.AddEventHubsMessagePumpUsingManagedIdentity(...) + .WithEventHubsMessageHandler(..., messageBodySerializer: new OrderBatchMessageBodySerializer()); + + // Register the message body serializer in the dependency container where the dependent services are manually injected. + services.AddEventHubsMessagePumpUsingManagedIdentity(...) + .WithEventHubsMessageHandler(..., messageBodySerializerImplementationFactory: serviceProvider => + { + var logger = serviceProvider.GetService>(); + return new SensorReadingBatchMessageHandler(logger); + }); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) then that handler may be chosen instead of the one with the specific filter. + +### Filter messages based on message body +When registering a new message handler, one can opt-in to add a filter on the incoming message body which filters out messages that are not needed to be processed by this message handler. +This can be useful when you want to route messages based on the message content itself instead of the messaging context. + +Following example shows how a message handler should only process a certain message when the status is 'Active'; meaning only `SensorReading` with active sensors will be processed. + +```csharp +// Message to be sent: +public enum SensorStatus { Active, Idle } + +public class SensorReading +{ + public string SensorId { get; set; } + public SensorStatus Status { get; set; } +} + +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessageHandling; + +// Message handler +public class SensorReadingMessageHandler : IAzureEventHubsMessageHandler +{ + public async Task ProcessMessageAsync(SensorReading message, AzureEventHubsMessageContext context, ...) + { + // Do some processing... + } +} + +using Microsoft.Extensions.DependencyInjection; + +// Message handler registration +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddEventHubsMessagePumpUsingManagedIdentity(...) + .WithEventHubsMessageHandler((SensorReading sensor) => sensor.Status == SensorStatus.Active); + } +} +``` + +### Fallback message handling +When receiving a message on the message pump and none of the registered `IAzureEventHubsMessageHandler`'s can correctly process the message, the message pump normally throws and logs an exception. +It could also happen in a scenario that's to be expected that some received messages will not be processed correctly (or you don't want them to). + +In such a scenario, you can choose to register a `IFallbackMessageHandler` in the dependency container. +This extra message handler will then process the remaining messages that can't be processed by the normal message handlers. + +Following example shows how such a message handler can be implemented: + +```csharp +using Arcus.Messaging.Pumps.EventHubs; +using Microsoft.Extensions.Logging; + +public class WarnsUserFallbackMessageHandler : IFallbackMessageHandler +{ + private readonly ILogger _logger; + + public WarnsUserFallbackMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync(string message, MessageContext context, ...) + { + _logger.LogWarning("These type of messages are expected not to be processed"); + } +} +``` + +And to register such an implementation: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddEventHubsMessagePump(...) + .WithFallbackMessageHandler(); + } +} +``` + +## Pump Configuration +The Azure EventHubs message pump can be configured further to met your needs. + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + // Uses managed identity to authenticate with Azure EventHubs. + // 🎖️ Recommended. + services.AddEventHubsMessagePumpUsingManagedIdentity( + eventHubsName: "", + fullyQualifiedNamespace: "", + blobContainerUri: ""); + + // 🚩 Arcus secret store will be used to lookup the connection strings (non-managed identity variants), + // for more information about the Arcus secret store see: https://security.arcus-azure.net/features/secret-store + services.AddSecretStore(stores => ...); + + // Options available across all variants: + services.AddEventHubsMessagePump(..., options => + { + // The name of the consumer group this processor is associated with. Events are read in the context of this group. + // Default: "$Default" + options.ConsumerGroup = ""; + + // The name of the Azure EventHubs message property that has the transaction ID. + // (default: Transaction-Id). + options.Routing.Correlation.TransactionIdPropertyName = "X-Transaction-ID"; + + // The format of the message correlation used when receiving Azure EventHubs messages. + // (default: W3C). + options.Routing.Correlation.Format = MessageCorrelationFormat.Hierarchical; + + // The name of the operation used when tracking the request telemetry. + // (default: Process) + options.Routing.Correlation.OperationName = "Sensor"; + + // The name of the Azure EventHubs message property that has the upstream service ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: Operation-Parent-Id). + options.Routing.Correlation.OperationParentIdPropertyName = "X-Operation-Parent-ID"; + + // The property name to enrich the log event with the correlation information cycle ID. + // ⚠ Only used when the correlation format is configured as Hierarchical. + // (default: CycleId) + options.Routing.CorrelationEnricher.CycleIdPropertyName = "X-CycleId"; + + // Indicate whether or not the default built-in JSON deserialization should ignore additional members + // when deserializing the incoming message (default: AdditionalMemberHandling.Error). + options.Routing.Deserialization.AdditionalMembers = AdditionalMembersHandling.Ignore; + }); + } +} +``` + +## Message Correlation +The message correlation of the received messages is set automatically. All the message handlers will have access to the current `MessageCorrelationInfo` correlation model for the specific currently processed message. + +To retrieve the correlation information in other application code, you can use a dedicated marker interface called `IMessageCorrelationInfoAccessor`. +Note that this is a scoped dependency and so will be the same instance across a scoped operation. + +```csharp +using Arcus.Messaging.Abstractions; + +public class DependencyService +{ + private readonly IMessageCorrelationInfoAccessor _accessor; + + public DependencyService(IMessageCorrelationInfoAccessor accessor) + { + _accessor = accessor; + } + + public void Method() + { + MessageCorrelationInfo correlation = _accessor.GetCorrelationInfo(); + + _accessor.SetCorrelation(correlation); + } +} +``` diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/04-event-hubs-azure-functions.md b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/04-event-hubs-azure-functions.md new file mode 100644 index 00000000..ec0b1e75 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/04-event-hubs-azure-functions.md @@ -0,0 +1,241 @@ +--- +title: "Azure EventHubs message handling for Azure Functions" +layout: default +--- + +This article describes how you can use Arcus' message handler concept with Azure Functions; allowing you to more easily port your business logic from/to Azure Functions. + +# Azure EventHubs message handling for Azure Functions +Our EventHubs message pump system provides a way to receive incoming events, but this is not needed in an Azure Functions environment. +Today, Azure Functions acts as a message receiver meaning that the function is triggered when an event is available. Azure Functions has no out-of-the-box way to provide a clean implementation for handling different types of messages that are received. If you want to do that, you'll need to write all plumbing code yourself. With Arcus.Messaging, we provide this for you so that you can focus on writing the actual business logic. + +Following terms are used: +- **Message handler**: implementation that processes the received event from an Azure EventHubs subscription. Message handlers are created by implementing the `IAzureEventHubsMessageHandler`. This message handler will be called upon when an event is available on the Azure EventHubs subscription. +- **Message router**: implementation that delegates the received Azure EventHubs event to the correct message handler. + +![Azure Functions message handling](/media/az-func-eventhubs-message-handling.png) + +We will walk you through the process of using message handlers with Azure Functions: + +## Installation +To use the following described features, install the following package: +```shell +PM > Install-Package -Name Arcus.Messaging.AzureFunctions.EventHubs +``` + +## Receive Azure EventHubs message in an Azure Function +Here's an example of how an Azure Function receives an Azure EventHubs message from a topic: + +```csharp +public class SensorReadingFunction +{ + [Function("sensor-reading")] + public async Task Run( + [EventHubTrigger("sensors", Connection = "EventHubsConnectionString")] string[] messages, + Dictionary[] propertiesArray, + FunctionContext context) + { + // Processing events... + } +} +``` + +## Declaring our Azure EventHubs message handlers +Registering message handlers to process the EventHubs events is fairly easy to do. + +> ⚡ You can use the same message handlers in an Azure Functions an a .NET Worker message pump scenario. + +Processing sensor reading updates: + +```csharp +public class SensorReadingUpdateEventHubsMessageHandler : IAzureEventHubsMessageHandler +{ + private readonly ILogger _logger; + + public SensorReadingUpdateEventHubsMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + SensorReadingUpdate readingUpdate, + AzureEventHubsMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + _logger.LogInformation("Processing sensor reading {SensorId} in room {Room}", readingUpdate.SensorId, readingUpdate.RoomName); + } +} +``` + +Processing sensor config updates: + +```csharp +public class SensorConfigUpdateMessageHandler : IAzureEventHubsMessageHandler +{ + private readonly ILogger _logger; + + public SensorConfigUpdateMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync( + SensorConfigUpdate configUpdate, + AzureEventHubsMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + log.LogInformation("Processing sensor config {SensorId} in room {Room}", configUpdate.SensorId, configUpdate.Room); + } +} +``` + +Now that we have created our message handlers, we can declare when we should use them by registering them with our router. + +## Processing received messages through the message router +Now that everything is setup, we need to actually use the declared message handlers by routing the events from the Azure Function into the correct message handler. + +To achieve that, we need to add message routing with the `.AddEventHubsMessageRouting` extension: + +### Isolated Azure Functions +```csharp +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +var host = new HostBuilder() + .ConfigureFunctionsWorkerDefaults(builder => + { + builder.Services.AddEventHubsMessageRouting() + .WithEventHubsMessageHandler() + .WithEventHubsMessageHandler(); + }) + .Build(); + +host.Run(); +``` + +### In-Process Azure Functions +```csharp +using Microsoft.Azure.Functions.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; + +[assembly: FunctionsStartup(typeof(Startup))] +namespace SensorReading +{ + public class Startup : FunctionsStartup + { + public override void Configure(IFunctionsHostBuilder builder) + { + builder.AddEventHubsMessageRouting() + .WithEventHubsMessageHandler() + .WithEventHubsMessageHandler(); + } + } +} +``` + +This extension will register an `IAzureEventHubsMessageRouter` interface that allows you to interact with the registered message handlers in a easy manner. + +> ⚡ It also registers an more general `IMessageRouter` you can use if the general message routing (with the event' raw body as `string` as input) will suffice. + +We can now inject the message router in our Azure Function and process all events with it. +This will determine what the matching message handler is and process it accordingly: + +### Isolated +```csharp +using Arcus.Messaging.Abstractions.EventHubs; +using Azure.Messaging.EventHubs; + +public class SensorReadingFunction +{ + private readonly IAzureEventHubsMessageRouter _messageRouter; + + public SensorReadingFunction(IAzureEventHubsMessageRouter messageRouter) + { + _messageRouter = messageRouter; + } + + [Function("sensor-reading")] + public async Task Run( + [EventHubTrigger("sensor-reading", Connection = "EventHubsConnectionString")] string[] messages, + Dictionary[] propertiesArray, + FunctionContext executionContext) + { + _logger.LogInformation("First EventHubs triggered message: {Message}", messages[0]); + + for (var i = 0; i < messages.Length; i++) + { + string eventBody = messages[i]; + Dictionary eventProperties = propertiesArray[i]; + + EventData eventData = CreateEventData(message, eventProperties); + AzureEventHubsMessageContext messageContext = eventData.GetMessageContext("", "$Default", ""); + + using (MessageCorrelationResult result = executionContext.GetCorrelationInfo(eventProperties)) + { + await _messageRouter.RouteMessageAsync(eventData, messageContext, result.CorrelationInfo, CancellationToken.None); + } + } + } + + private static EventData CreateEventData(string eventBody, IDictionary properties) + { + var data = new EventData(eventBody); + foreach (KeyValuePair property in properties) + { + data.Properties.Add(property.Key, property.Value.ToString()); + } + + return data; + } +} +``` + +### In-Process +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.EventHubs; +using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; +using Arcus.Messaging.AzureFunctions.EventHubs; + +public class SensorReadingFunction +{ + private readonly IAzureEventHubsMessageRouter _messageRouter; + private readonly AzureFunctionsInProcessMessageCorrelation _messageCorrelation; + + public SensorReadingFunction( + IAzureEventHubsMessageRouter messageRouter, + AzureFunctionsInProcessMessageCorrelation messageCorrelation) + { + _messageRouter = messageRouter; + _messageCorrelation = messageCorrelation; + } + + [FunctionName("sensor-reading")] + public async Task Run( + [EventHubTrigger("sensors", Connection = "EventHubsConnectionString")] EventData[] events, + ILogger log, + CancellationToken cancellationToken) + { + foreach (EventData message in events) + { + log.LogInformation($"First Event Hubs triggered message: {message.MessageId}"); + + var messageContext = message.GetMessageContext("sensor-reading.servicebus.windows.net", "$Default", "sensors"); + + // W3C message correlation (with built-in Microsoft dependency tracking, recommended). + using (MessageCorrelationResult result = _messageCorrelation.CorrelateMessage(message)) + { + await _messageRouter.RouteMessageAsync(message, messageContext, result.CorrelationInfo, cancellationToken); + } + + // Hierarchical message correlation (without built-in Microsoft dependency tracking). + MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + await _messageRouter.RouteMessageAsync(message, messageContext, correlationInfo, cancellationToken); + } + } +} +``` + +Upon receival of an Azure EventHubs event, the event will be either routed to one of the two previously registered message handlers. diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/05-customize-general.md b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/05-customize-general.md new file mode 100644 index 00000000..3607cea9 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/05-customize-general.md @@ -0,0 +1,255 @@ +--- +title: "Customize general message handling" +layout: default +--- + +# Customize general message handling + +## Filter messages based on message context + +When registering a new message handler, one can opt-in to add a filter on the message context which filters out messages that are not needed to be processed. + +This can be useful when you are sending different message types on the same queue. Another use-case is being able to handle different versions of the same message type which have different contracts because you are migrating your application. + +Following example shows how a message handler should only process a certain message when a property's in the context is present. + +We'll use a simple message handler implementation: + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessagingHandling; + +public class OrderMessageHandler : IMessageHandler +{ + public async Task ProcessMessageAsync(Order order, MessageContext context, ...) + { + // Do some processing... + } +} +``` + +We would like that this handler only processed the message when the context contains `MessageType` equals `Order`. + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddYourMessagePump(...) + .WithMessageHandler(context => context.Properties["MessageType"].ToString() == "Order"); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) than that handler may be chosen instead of the one with the specific filter. + +## Control custom deserialization + +### Extending the message router + +By default, when using the `.AddMessageRouting`, an new `IMessageRouter` instance is registered that contains the basic message routing/handling for incoming messages. + +If you want to control the message routing or provide additional functionality, you can do this by implementing your own message router. Next to that, you can also inherit from our built-in message router and override the necessary methods of your choosing. + +Here's how you normally would register the message router: + +```csharp +public void ConfigureServices(IServiceCollection services) +{ + // Registers `IMessageRouter`. + services.AddMessageRouting(); +} +``` + +Alternatively, you can implement your own. + +```csharp +using System; +using Arcus.Messaging.Pumps.Abstractions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +public class OrderMessageRouter : MessageRouter +{ + public OrderMessageRouter( + IServiceProvider serviceProvider, + ILogger logger) + : base(serviceProvider, logger) + { + } + + public override bool TryDeserializeToMessageFormat(string message, Type messageType, out object? result) + { + if (messageType == typeof(Order)) + { + result = JsonConvert.DeserializeObject(message); + return true; + } + else + { + result = null; + return false; + } + } +} +``` + +> Note that our built-in `MessageRouter` implements `IMessageRouter` which you can use within your application. + +When inheriting from the `MessageRouter` type, we've controlled how the incoming raw message is being deserialized. + +Based on the message type of the registered message handlers, the router determines if the incoming message can be deserialized to that type or not. + +This custom message router can be registered using the following code. The custom router will then be registered as an `IMessageRouter` instance. + +```csharp +public void ConfigureServices(IServiceCollection services) +{ + services.AddMessageRouting(serviceProvider => + { + var logger = serviceProvider.GetService>(); + return new OrderMessageRouter(serviceProvider, logger); + }); +} +``` + +### Bring your own deserialization + +You can also choose to extend the built-in message deserialization with additional deserializer to meet your needs. +This allows you to easily deserialize into different message formats or reuse existing (de)serialization capabilities that you already have without altering the message router. + +You start by implementing an `IMessageBodySerializer`. The following example shows how an expected type can be transformed to something else. +The result type (in this case `OrderBatch`) will be then be used to check if there is an `IMessageHandler` registered with that message type. + +```csharp +using Arcus.Messaging.Pumps.Abstractions.MessageHandling; + +public class OrderBatchMessageBodySerializer : IMessageBodySerializer +{ + public async Task DeserializeMessageAsync(string messageBody) + { + var serializer = new XmlSerializer(typeof(Order[])); + using (var contents = new MemoryStream(Encoding.UTF8.GetBytes(messageBody))) + { + var orders = (Order[]) serializer.Deserialize(contents); + return MessageResult.Success(new OrderBatch(orders)); + } + } +} +``` + +The registration of these message body handlers can be done just as easily as an `IMessageSerializer`: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Register the message body serializer in the dependency container where the dependent services will be injected. + services.WithMessageHandler(..., messageBodySerializer: new OrderBatchMessageBodySerializer()); + + // Register the message body serializer in the dependency container where the dependent services are manually injected. + services.WithMessageHandler(..., messageBodySerializerImplementationFactory: serviceProvider => + { + var logger = serviceProvider.GetService>(); + return new OrderBatchMessageHandler(logger); + }); + } +} +``` + +> Note that the order in which the message handlers are registered is important in the message processing. +> In the example, when a message handler above this one is registered that could also handle the message (same message type) than that handler may be chosen instead of the one with the specific filter. + +## Filter messages based on message body + +When registering a new message handler, one can opt-in to add a filter on the incoming message body which filters out messages that are not needed to be processed. +This can be useful when you want to route messages based on the message content itself instead of the messaging context. + +Following example shows how a message handler should only process a certain message when the status is 'Sales'; meaning only `Order` for the sales division will be processed. + +```csharp +// Message to be sent: +public enum Department { Sales, Marketing, Operations } + +public class Order +{ + public string Id { get; set; } + public Department Type { get; set; } +} + +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessageHandling; + +// Message handler +public class OrderMessageHandler : IMessageHandler +{ + public async Task ProcessMessageAsync(Order order, MessageContext context, ...) + { + // Do some processing... + } +} + +using Microsoft.Extensions.DependencyInjection; + +// Message handler registration +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.WithMessageHandler((Order order) => order.Type == Department.Sales); + } +} +``` + +## Fallback message handling + +When receiving a message on the message pump and none of the registered `IMessageHandler`'s can correctly process the message, the message router normally throws and logs an exception. + +It could also happen in a scenario that's to be expected that some received messages will not be processed correctly (or you don't want them to). + +In such a scenario, you can choose to register a `IFallbackMessageHandler` in the dependency container. +This extra message handler will then process the remaining messages that can't be processed by the normal message handlers. + +Following example shows how such a message handler can be implemented: + +```csharp +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Pumps.Abstractions.MessageHandling; +using Microsoft.Extensions.Logging; + +public class WarnsUserFallbackMessageHandler : IFallbackMessageHandler +{ + private readonly ILogger _logger; + + public WarnsUserFallbackMessageHandler(ILogger logger) + { + _logger = logger; + } + + public async Task ProcessMessageAsync(string message, MessageContext context, ...) + { + _logger.LogWarning("These type of messages are expected not to be processed"); + } +} +``` + +And to register such an implementation: + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.WithFallbackMessageHandler(); + } +} +``` \ No newline at end of file diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/_category_.yml b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/_category_.yml new file mode 100644 index 00000000..6c03b3f4 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/02-message-handling/_category_.yml @@ -0,0 +1 @@ +label: 'Message handling' diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/03-tcp-health-probe.md b/docs/versioned_docs/version-v2.1.0/02-Features/03-tcp-health-probe.md new file mode 100644 index 00000000..1228e552 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/03-tcp-health-probe.md @@ -0,0 +1,99 @@ +--- +title: "TCP Health probe" +layout: default +--- + +# TCP Health probe + +We provide a TCP health probe endpoint that allows a runtime to periodically check the liveness/readiness of the host based on the [.NET Core health checks](https://docs.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks). + +## Installation + +This features requires to install our NuGet package: + +```shell +PM > Install-Package Arcus.Messaging.Health +``` + +## Usage + +To include the TCP endpoint, add the following line of code in the `Startup.ConfigureServices` method: + +```csharp +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Add TCP health probe without extra health checks. + services.AddTcpHealthProbes("MyConfigurationKeyToTcpHealthPort"); + + // Or, add your extra health checks in a configuration delegate. + services.AddTcpHealthProbes( + "MyConfigurationKeyToTcpHealthPort", + configureHealthChecks: healthBuilder => + { + healthBuilder.AddCheck("Example", () => HealthCheckResult.Healthy("Example is OK!"), tags: new[] { "example" }) + }); + } +} +``` + +## Configuration + +The TCP probe allows several additional configuration options. + +```csharp +using Microsoft.Extensions.DependencyInjection; + +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + // Add TCP health probe with or without extra health checks. + services.AddTcpHealthProbes( + "MyConfigurationKeyToTcpHealthPort", + configureTcpListenerOptions: options => + { + // Configure the configuration key on which the health report is exposed. + options.TcpPortConfigurationKey = "MyConfigurationKey"; + + // Configure how the health report should be serialized. + options.HealthReportSerializer = new MyHealthReportSerializer(); + + // Configure how the health report status should affect the TCP probe's availability. + // When set to `true`, unhealthy health reports will result in rejecting of TCP client connection attempts. + // When set to `false` (default), TCP client connection attempts will be accepted but the returned health report will have a unhealthy health status. + options.RejectTcpConnectionWhenUnhealthy = true; + }, + configureHealthCheckPublisherOptions: options => + { + // Configures additional options regarding how fast or slow changes in the health report should affect the TCP probe's availability. + // When the RejectTcpConnectionWhenUnhealthy is set to `true`. + + // The initial delay after the application starts with monitoring the health report changes. + options.Delay = TimeSpan.Zero; + + // The interval in which the health report is monitored for changes. + options.Period = TimeSpan.FromSeconds(5); + + // See https://docs.microsoft.com/en-us/dotnet/api/microsoft.extensions.diagnostics.healthchecks.healthcheckpublisheroptions?view=dotnet-plat-ext-5.0 for more information. + }); + } +} + +using Arcus.Messaging.Health; +using Microsoft.Extensions.Diagnostics.HealthChecks; + +public class MyHealthReportSerializer : IHealthReportSerializer +{ + public byte[] Serialize(HealthReport healthReport) + { + return Array.Empty(); + } +} +``` + +[← back](/) diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/04-service-bus-extensions.md b/docs/versioned_docs/version-v2.1.0/02-Features/04-service-bus-extensions.md new file mode 100644 index 00000000..eadd1ee6 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/04-service-bus-extensions.md @@ -0,0 +1,179 @@ +--- +title: "Azure Service Bus Extensions" +layout: default +--- + +# Azure Service Bus Extensions + +We provide several additional features related to message creation/sending and message/context discoverability. + +## Installation + +This features requires to install our NuGet package: + +```shell +PM > Install-Package Arcus.Messaging.ServiceBus.Core +``` + +## Using Arcus secret store when registering the Service Bus client + +When registering a `ServiceBusClient` via [Azure's client registration process](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.servicebus-readme), the library provides an extension to pass-in a secret name instead of directly passing the Azure Service Bus connection string. +This secret name will correspond with a registered secret in the [Arcus secret store](https://security.arcus-azure.net/features/secret-store) that holds the Azure Service Bus connection string. + +Following example shows how the secret name is passed to this extension overload: + +```csharp +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + // Adding Arcus secret store, more info: https://security.arcus-azure.net/features/secret-store + services.AddSecretStore(stores => stores.AddAzureKeyVaultWithManagedIdentity("https://my.vault.azure.net"); + + // Adding Service Bus client with secret in Arcus secret store. + services.AddAzureClients(clients => clients.AddServiceBusClient(connectionStringSecretName: "")); + } +} +``` + +🥇 Adding your Azure Service Bus client this way helps separating application configuration from sensitive secrets. For more information on the added-values of the Arcus secret store, see [our dedicated documentation page](https://security.arcus-azure.net/features/secret-store). + +## Automatic tracking and Hierarchical correlating of Service Bus messages + +The Arcus message pump/router automatically makes sure that received Azure Service Bus messages are tracked as request telemetry in Application Insights. +If you also want the sender (dependency tracking) to be linked to the request, we provide a set of easy extensions on the `ServiceBusSender` to make this happen. +For more information on dependency tracking, see the [Arcus Observability feature documentation on telemetry tracking](https://observability.arcus-azure.net/features/writing-different-telemetry-types/). + +> 🚩 By default, the Service Bus message pump is using W3C correlation, not Hierarchical, which already allows automatic dependency tracking upon sending without additional configuration. If you want to use Hierarchical, please configure the correlation format in the [message pump configuration](./02-message-handling/01-service-bus.md). + +Internally, we enrich the `ServiceBusMessage` with the message correlation and track the entire operation as an Azure Service Bus dependency. +The result of this operation will result in a parent-child relationship between the dependency-request. + +Following example shows how any business content (`Order`) can be wrapped automatically internally in a `ServiceBusMessage`, and send as a correlated tracked message to the Azure Service Bus resource: + +```csharp +using Azure.Messaging.ServiceBus; + +Order order = ... // Your business model. +MessageCorrelationInfo correlation = ... // Retrieved from your message handler implementation. +ILogger logger = ... // Your dependency injected logger from your application. + +await using (var client = new ServiceBusClient(...)) +await using (ServiceBusSender sender = client.CreateSender("my-queue-or-topic")) +{ + await sender.SendMessageAsync(order, correlation, logger); + // Output: {"DependencyType": "Azure Service Bus", "DependencyId": "c55c7885-30c5-4785-ad15-a96e03903bfa", "TargetName": "ordersqueue", "Duration": "00:00:00.2521801", "StartTime": "03/23/2020 09:56:31 +00:00", "IsSuccessful": true, "Context": {"EntityType": "Queue"}} +} +``` + +The dependency tracking can also be configured with additional options to your needs. +You can also create your own `ServiceBusMessage` with one of the method overloads, so you have influence on the entire message's contents and application properties. + +> ⚠ Note that changes to the application property names should also reflect in changes in the application properties at the receiving side, so that the message pump/router knows where it will find these correlation properties. + +```csharp +await sender.SendMessageAsync(order, correlation, logger, options => +{ + // The Azure Service Bus application property name where the message correlation transaction ID will be set. + // Default: Transaction-Id + options.TransactionIdPropertyName = "My-Transaction-Id"; + + // The Azure Service Bus application property name where the dependency ID property will be set. + // This ID is by default generated and added to both the dependency tracking as the message. + // Default: Operation-Parent-Id + options.UpstreamServicepropertyName = "My-UpstreamService-Id"; + + // The Azure Service Bus application function to generate a dependency ID which will be added to both the message as the dependency tracking. + // Default: GUID generation. + options.GenerateDependencyId = () => $"dependency-{Guid.NewGuid()}"; + + // The dictionary containing any additional contextual inforamtion that will be used when tracking the Azure Service Bus dependency (Default: empty dictionary). + options.AddTelemetryContext(new Dictionary + { + ["My-ServiceBus-custom-key"] = "Any additional information" + }); +}); + +ServiceBusMessage message = ... +await sender.SendMessageAsync(message, ...); +``` + +We also support multiple message bodies or messages: + +```csharp +Order[] orders = ... +await sender.SendMessagesAsync(orders, ...); + +ServiceBusMessage[] messages = ... +await sender.SendMessagesAsync(mesages, ...); +``` + +## Simplify Creating Service Bus Messages + +Starting from the message body, we provide a builder to quickly wrap the content in a valid Azure Service Bus `ServiceBusMessage` type that can be send. + +```csharp +using Azure.Messaging.ServiceBus; + +Order order = new Order("order-id"); +ServiceBusMessage message = ServiceBusMessageBuilder.CreateForBody(order).Build(); +``` + +We also provide additional, optional parameters during the creation: + +```csharp +using Azure.Messaging.ServiceBus; + +ServiceBusMessage message = + ServiceBusMessageBuilder.CreateForBody(order, Encoding.UTF8) + .WithOperationId($"operation-{Guid.NewGuid()}") + .WithTransactionId($"transaction-{Guid.NewGuid()}") + .WithOperationParentId($"parent-{Guid.NewGuid()}") + .Build(); +``` + + +* `OperationId`: reflects the ID that identifies a single operation within a service. The passed ID will be set on the `message.CorrelationId` property. +* `TransactionId`: reflects the ID that identifies the entire transaction across multiple services. The passed ID will be set on the `message.ApplicationProperties` dictionary with the key `"Transaction-Id"`, which is overridable. +* `OperationParentId`: reflecs the ID that identifies the sender of the event message to the receiver of the message (parent -> child). The passed ID will be set on the `message.ApplicationProperties` dictionary with the key `Operation-Parent-Id`, which is overriable. + +## Simplify Message Information Discovery + +On receive, the Azure Service Bus message contains a set of `.ApplicationProperties` with additional information ie. correlation. +This information can be accessed in a more simplified way: + +```csharp +using Arcus.Messaging.Abstractions; +using Azure.Messaging.ServiceBus; + +ServiceBusMessage message = ... + +// Extracted all correlation information from the `.ApplicationProperties` and wrapped inside a valid correlation type. +MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + +// Extract only the transaction identifier from the correlation information. +string transactionId = message.GetTransactionId(); + +// Extract a user property in a typed manner. +string myCustomPropertyValue = message.GetUserProperty("my-custom-property-key"); +``` + +## Simplify Message Context Information Discovery + +On receive, the context in which the message is received contains a set of `.ApplicationProperties` with additional information ie. encoding. +This information can be access in a more simplified way: + +```csharp +using Arcus.Messaging.Abstractions; +using Azure.Messaging.ServiceBus; + +// Creates a new messaging context from the message, using an unique job ID to identify all message handlers that can handle this specific context. +AzureServiceBusMessageContext messageContext = message.GetMessageContext("my-job-id", ServiceBusEntityType.Topic); + +// Extract the encoding information from the `.ApplicationProperties` and wrapped inside a valid `Encoding` type. +MessageContext messageContext = ... +Encoding encoding = messageContext.GetMessageEncodingProperty(); +``` diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/05-event-hubs-extensions.md b/docs/versioned_docs/version-v2.1.0/02-Features/05-event-hubs-extensions.md new file mode 100644 index 00000000..6cd4fa52 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/05-event-hubs-extensions.md @@ -0,0 +1,179 @@ +--- +title: "Azure EventHubs Extensions" +layout: default +--- + +# Azure EventHubs Extensions + +Besides the Azure EventHubs message handling functionality, we provide several additional features related to message creation/sending and message/context discoverability. +They help in hte send/receive process of Azure EventHubs event messages. + +## Installation + +These features require to install our NuGet package: + +```shell +PM > Install-Package Arcus.Messaging.EventHubs.Core +``` + +## Using Arcus secret store when registering the EventHubs producer client + +When registering an `EventHubsProducerClient` via [Azure's client registration process](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventhubs-readme), the library provides an extension to pass-in a secret name instead of directly passing the Azure EventHubs connection string. +This secret name will correspond with a registered secret in the [Arcus secret store](https://security.arcus-azure.net/features/secret-store) that holds the Azure EventHubs connection string. + +> ⚠ An Azure EventHubs connection string can either contain the `EntityPath` or not if it was copied from the EventHubs namespace or from the EventHub itself. In either case, make sure that you either pass in the EventHub name separately, or that the connection string contains this name. For more information, see: [How to get an Event Hubs connection string](https://docs.microsoft.com/azure/event-hubs/event-hubs-get-connection-string). + +Following example shows how the secret name is passed to this extension overload: + +```csharp +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.DependencyInjection; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + // Adding Arcus secret store, more info: https://security.arcus-azure.net/features/secret-store + services.AddSecretStore(stores => stores.AddAzureKeyVaultWithManagedIdentity("https://my.vault.azure.net"); + + // Adding EventHubs producer client with secret in Arcus secret store, + // using connection string that contains EventHubs name. + services.AddAzureClients(clients => clients.AddEventHubProducerClient(connectionStringSecretName: "")); + + // Adding EventHubs producer client with secret in Arcus secret store, + // using connection string that does not contain EventHubs name. + services.AddAzureClients(clients => clients.AddEventHubProducerClient(connectionStringSecretName: "", "")); + } +} +``` + +🥇 Adding your Azure EventHubs producer client this way helps separating application configuration from sensitive secrets. For more information on the added-values of the Arcus secret store, see [our dedicated documentation page](https://security.arcus-azure.net/features/secret-store). + +## Automatic tracking and Hierarchical correlating of EventHubs messages + +The Arcus message pump/router automatically makes sure that received Azure EventHubs event messages are tracked as request telemetry in Application Insights. +If you also want the sender (dependency tracking) to be linked to the request, we provide a set of easy extensions on the `EventHubProducerClient` to make this happen. +For more information on dependency tracking, see the [Arcus Observability feature documentation on telemetry tracking](https://observability.arcus-azure.net/features/writing-different-telemetry-types/). + +> 🚩 By default, the EventHubs message pump is using W3C correlation, not Hierarchical, which already allows automatic dependency tracking upon sending without additional configuration. If you want to use Hierarchical, please configure the correlation format in the [message pump configuration](./02-message-handling/03-event-hubs.md). + +Internally, we enrich the `EventData` with the message correlation and track the entire operation as an Azure EventHubs dependency. +The result of this operation will result in a parent-child relationship between the dependency-request. + +Following example shows how any business content (`Order`) can be wrapped automatically internally in a `EventHubs`, and send as a correlated tracked message to the Azure EventHubs resource: + +```csharp +using Azure.Messaging.EventHubs; + +Order[] orders = ... // Your business model. +MessageCorrelationInfo correlation = ... // Retrieved from your message handler implementation. +ILogger logger = ... // Your dependency injected logger from your application. + +await using (var producer = new EventHubProducerClient("", "") +{ + await producer.SendAsync(orders, correlation, logger); + // Output: {"DependencyType": "Azure Event Hubs", "DependencyId": "c55c7885-30c5-4785-ad15-a96e03903bfa", "TargetName": "", "Duration": "00:00:00.2521801", "StartTime": "03/23/2020 09:56:31 +00:00", "IsSuccessful": true, "Context": []} +} +``` + +The dependency tracking can also be configured with additional options to your needs. +You can also create your own `EventData` with one of the method overloads, so you have influence on the entire message's contents and application properties. + +> ⚠ Note that changes to the application property names should also reflect in changes in the application properties at the receiving side, so that the message pump/router knows where it will find these correlation properties. + +```csharp +await producer.SendAsync(orders, correlation, logger, options => +{ + // The Azure EventHubs application property name where the message correlation transaction ID will be set. + // Default: Transaction-Id + options.TransactionIdPropertyName = "My-Transaction-Id"; + + // The Azure EventHubs application property name where the dependency ID property will be set. + // This ID is by default generated and added to both the dependency tracking as the message. + // Default: Operation-Parent-Id + options.UpstreamServicepropertyName = "My-UpstreamService-Id"; + + // The Azure EventHubs application function to generate a dependency ID which will be added to both the message as the dependency tracking. + // Default: GUID generation. + options.GenerateDependencyId = () => $"dependency-{Guid.NewGuid()}"; + + // The contextual information that will be used when tracking the Azure EventHubs dependency. + // Default: empty dictionary. + options.AddTelemetryContext(new Dictionary + { + ["Additional_EventHubs_Info"] = "EventHubs-Info" + }); +}); + +EventData[] messages = ... +await sender.SendAsync(messages, ...); +``` + +## Simplify Creating EventHubs Messages + +Starting from the message body, we provide a builder to quickly wrap the content in a valid Azure EventHubs `EventData` type that can be send. + +```csharp +using Azure.Messaging.EventHubs; + +Order order = new Order("order-id"); +EventData message = EventDataBuilder.CreateForBody(order).Build(); +``` + +We also provide additional, optional parameters during the creation: + +```csharp +using Azure.Messaging.ServiceBus; + +Order order = new Order("order-id"); +EventData message = + EventDataBuilder.CreateForBody(order, Encoding.UTF8) + .WithOperationId($"operation-{Guid.NewGuid()}") + .WithTransactionId($"transaction-{Guid.NewGuid()}") + .WithOperationParentId($"parent-{Guid.NewGuid()}") + .Build(); +``` + +* `OperationId`: reflects the ID that identifies a single operation within a service. The passed ID will be set on the `eventData.CorrelationId` property. +* `TransactionId`: reflects the ID that identifies the entire transaction across multiple services. The passed ID will be set on the `eventData.Properties` dictionary with the key `"Transaction-Id"`, which is overridable. +* `OperationParentId`: reflecs the ID that identifies the sender of the event message to the receiver of the message (parent -> child). The passed ID will be set on the `eventData.Properties` dictionary with the key `Operation-Parent-Id`, which is overriable. + +## Simplify Message Information Discovery + +On receive, the Azure EventHubs event message contains a set of `.Properties` with additional information ie. correlation set form the `EventDataBuilder`. +This information can be accessed in a more simplified way: + +```csharp +using Arcus.Messaging.Abstractions; +using Azure.Messaging.EventHubs; + +EventData message = ... + +// Extracted all correlation information from the `.Properties` and wrapped inside a valid correlation type. +MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); + +// Extract only the transaction identifier from the correlation information. +string transactionId = message.GetTransactionId(); + +// Extract a user property in a typed manner. +string myCustomPropertyValue = message.GetUserProperty("my-custom-property-key"); +``` + +## Simplify Message Context Information Discovery + +On receive, the context in which the message is received contains a set of `.Properties` with additional information ie. encoding set from the `EventDataBuilder`. +This information can be access in a more simplified way: + +```csharp +using Arcus.Messaging.Abstractions; +using Azure.Messaging.EventHubs; + +EventProcessorClient processor = ... // Client that receives the message. +EventData message = ... // The received message. + +// Creates a new messaging context from the message and processor, using an unique job ID to identify all message handlers that can handle this specific context. +AzureEventHubsMessageContext messageContext = message.GetMessageContext(processor, "my-job-id"); + +// Extract the encoding information from the `.Properties` and wrapped inside a valid `Encoding` type. +Encoding encoding = messageContext.GetMessageEncodingProperty(); +``` diff --git a/docs/versioned_docs/version-v2.1.0/02-Features/06-general-messaging.md b/docs/versioned_docs/version-v2.1.0/02-Features/06-general-messaging.md new file mode 100644 index 00000000..014a3e9b --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/02-Features/06-general-messaging.md @@ -0,0 +1,159 @@ +--- +title: "General messaging" +layout: default +--- + +# General messaging functionality +The Arcus Messaging library has several messaging systems that can retrieve messages from Azure technology (like Azure Service Bus and Azure EventHubs), and run abstracted message handlers on the received messages. +Both and future systems also support some general functionality that will be explained here. + +## Stop message pump when downstream is unable to keep up + +### Pause message processing with a circuit breaker +When your message handler interacts with a dependency on an external resource, that resource may become unavailable. In that case you want to temporarily stop processing messages. + +⚠️ This functionality is currently only available for the Azure Service Bus message pump. + +⚠️ This functionality is not supported for the Azure Event Hubs message pump. + +⚠️ This functionality is only available when interacting with message pumps, not in message router-only scenarios like Azure Functions. + +To interact with the message processing system within your custom message handler, you can inject the `IMessagePumpCircuitBreaker`: + +```csharp +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Arcus.Messaging.Pumps.Abstractions.Resiliency; + +public class OrderMessageHandler : CircuitBreakerServiceBusMessageHandler +{ + private readonly IMessagePumpCircuitBreaker _circuitBreaker; + + public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker, ILogger<...> logger) : base(circuitBreaker, logger) + { + _circuitBreaker = circuitBreaker; + } + + public override async Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessagePumpCircuitBreakerOptions options, + ...) + { + // Determine whether your dependent system is healthy... + if (!IsDependentSystemHealthy()) + { + throw new ...Exception("My dependency system is temporarily unavailable, please halt message processing for now"); + } + else + { + // Process your message... + } + } +} +``` + +The message pump will by default act in the following pattern: +* Circuit breaker calls `Pause` + * Message pump stops processing messages for a period of time (circuit is OPEN). +* Message pump tries processing a single message (circuit is HALF-OPEN). + * Dependency still unhealthy? => circuit breaker pauses again (circuit is OPEN) + * Dependency healthy? => circuit breaker resumes, message pump starts receiving message in full again (circuit is CLOSED). + +Both the recovery period after the circuit is open and the interval between messages when the circuit is half-open is configurable when calling the circuit breaker. These time periods are related to your dependent system and could change by the type of transient connection failure. + +```csharp + public override async Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessagePumpCircuitBreakerOptions options, + ...) +{ + // Sets the time period the circuit breaker should wait before retrying to receive messages. + // A.k.a. the time period the circuit is closed (default: 30 seconds). + options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15); + + // Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery. + // A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds). + options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5); +} +``` + +#### 🔔 Get notified on a circuit breaker state transition +Transitions from circuit breaker states happens internally and automatically. The library supports a notification system that lets you register event handlers that gets called upon a state transition. + +After the message pump and/or message handlers are registered, you can add one or more handlers linked to specific transitions to the previously registered pump. + +```csharp +using Arcus.Messaging.Pumps.Abstractions; + +services.AddServiceBusMessagePump(...) + .WithCircuitBreakerStateChangedEventHandler() + .WithCircuitBreakerStateChangedEventHandler(); +``` + +The instances should implement the `ICircuitBreakerEventHandler`, which allows you to access the new state for the pump. + +```csharp +public class MyFirstCircuitBreakerEventHandler : ICircuitBreakerEventHandler +{ + public Task OnTransitionAsync(MessagePumpCircuitStateChang change) + { + // The job ID of the message pump that was transitioned. + string jobId = change.JobId; + + // The circuit breaker state transitions. + MessagePumpCircuitState oldState = change.OldState; + MessagePumpCircuitState newState = change.NewState; + + // Process the state change event... + } +} +``` + +### Pause message processing for a fixed period of time +⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality. + +When messages are being processed by a system that works slower than the rate that messages are being received by the message pump, a rate problem could occur. +As a solution to this problem, the Arcus Messaging library registers an `IMessagePumpLifetime` instance in the application services that lets you control the message receiving process and pauses if necessary for the downstream dependency system to keep up. + +The following example below shows how a 'rate limit' service gets injected with the `IMessagePumpLifetime` instance and pauses. +Note that the message pumps need to be registered with a unique job ID so that the lifetime component knows which pump it needs to manage. + +```csharp +using Microsoft.Extensions.DependencyInjection; +using Arcus.Messaging.Pumps.Abstractions; + +public class Program +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddServiceBusMessagePump(..., options => options.JobId = "abc-123") + .WithServiceBusMessageHandler<..., ...>(); + + services.AddEventHubsMessagePump(..., options => options.JobId = "def-456") + .WithEventHubsMessageHandler<..., ...>(); + } +} + +public class RateLimitService +{ + private readonly IMessagePumpLifetime _pumpLifetime; + + public RateLimitService(IMessagePumpLifetime lifetime) + { + _pumpLifetime = lifetime; + } + + public async Task CantKeepUpAnymoreAsync(CancellationToken cancellationToken) + { + var duration = TimeSpan.FromSeconds(30); + await _pumpLifetime.PauseProcessingMessagesAsync("abc-123", duration, cancellationToken); + } +} +``` + +⚡ Besides the `PauseProcessingMessagesAsync` method, there also exists `Stop...`/`Start...` variants so that you can control the time dynamically when the pump is allowed to run again. + +For more information on message pumps: +- [Azure Service Bus message pump](./02-message-handling/01-service-bus.md) +- [Azure EventHubs message pump](./02-message-handling/03-event-hubs.md) diff --git a/docs/versioned_docs/version-v2.1.0/03-Guides/migration-guide-v1.0.md b/docs/versioned_docs/version-v2.1.0/03-Guides/migration-guide-v1.0.md new file mode 100644 index 00000000..416465c3 --- /dev/null +++ b/docs/versioned_docs/version-v2.1.0/03-Guides/migration-guide-v1.0.md @@ -0,0 +1,96 @@ +# Migration guide towards v1.0 +Starting from v1.0, there are some major breaking changes. To make it easier for you migrate towards this new version, we have assembled a migration guide to help you in the process. + +## New Azure SDK +We have chosen to also update our library to the new Azure SDK when interacting with the Azure Service Bus ([#159](https://github.com/arcus-azure/arcus.messaging/discussions/159)). This package update has some consequences on our library. + +> Note that Azure Service Bus plug-ins are [still not supported](https://github.com/arcus-azure/arcus.messaging/discussions/159) in this new SDK. + +### Package update +The `Microsoft.Azure.ServiceBus` NuGet package is now completely removed from the library and is changed by the `Arcus.Messaging.ServiceBus` NuGet package. This means that possible compile errors can occur when using types or signatures that were only available in this older package. + +### Service Bus message update for fallback message handlers +The `AzureServiceBusFallbackMessageHandler<>` abstract type has an updated signature as it now uses the new `ServiceBusReceivedMessage` instead of the `Message` when providing a fallback for a message handler pipeline: + +```diff +- using Microsoft.Azure.ServiceBus; ++ using Azure.Messaging.ServiceBus; + +public class OrderFallbackMessageHandler : AzureServiceBusFallbackMessageHandler +{ + public override async Task ProcessMessageAsync( +- Message message, ++ ServiceBusReceivedMessage message, + AzureServiceBusMessageContext azureMessageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + ... + } +} +``` + +> Note that some Service Bus-specific operations were renamed to, see [this section](#renamed-fallback-message-handler-operations) for more info. + +### Message correlation information update +The correlation information model `MessageCorrelationInfo` could previously be extracted from the [`Message` of the old SDK](https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.servicebus.message?view=azure-dotnet) with the extension `message.GetCorrelationInfo()`. + +This new version works with the new `ServiceBusReceivedMessage`, so the correlation extension is also moved. + +```diff +- using Microsoft.Azure.ServiceBus; ++ using Azure.Messaging.ServiceBus; + +- Message message = ... ++ ServiceBusReceivedMessage message = ... + +message.GetCorrelationInfo(); +``` + +## Moved message handler types to abstraction namespaces +All your Azure Service Bus message handlers implementations will probably give compile errors. This is caused by the breaking change that moved all the 'message handler'-related types towards abstractions namespaces ([#153](https://github.com/arcus-azure/arcus.messaging/issues/153)). +Practically, this means that these namespaces are renamed: + +* `Arcus.Messaging.Pumps.MessagingHandling` becomes `Arcus.Messaging.Abstractions.MessageHandling` +* `Arcus.Messaging.Pumps.ServiceBus.MessageHandling` becomes `Arcus.Messaging.Abstractions.MessageHandling.ServiceBus` + * Also the `IAzureServiceBusMessageHandler<>` is moved to this new namespace. + +This has effect on the `IAzureServiceBusMessageHandler<>`, `IAzureServiceBusFallbackMessageHandler` interfaces; and the `AzureServiceBusMessageHandler<>`, `AzureServiceBusFallbackMessageHandler<>` abstract types. +Following example shows how older versions has now uses non-existing namespaces in the. + +```diff +- using Arcus.Messaging.Pumps.ServiceBus; ++ using Arcus.Messaging.Abstractions.MessageHandling.ServiceBus; + +public class OrderMessageHandler : IAzureServiceBusMessageHandler +{ + ... +} +``` + +## Renamed fallback message handler operations +Any of your custom fallback message handler implementations that inherit from the `AzureServiceBusFallbackMessageHandler` abstract type will probably cause compile errors. This is caused by our change that renamed the Service Bus-specific operations on this abstract type ([#194](https://github.com/arcus-azure/arcus.messaging/issues/194)). + +```diff +public class OrderFallbackMessageHandler : AzureServiceBusFallbackMessageHandler +{ + public override Task ProcessMessageAsync(...) + { +- base.CompleteAsync(message); ++ base.CompleteMessageAsync(message); + } +} +``` + +## Fluent API discovery for message handling +It's possible that some of your Azure Service Bus message handler registrations give compile errors. This is caused because we have introduced a dedicated type as return type when registering an Azure Service Bus message pump ([#152](https://github.com/arcus-azure/arcus.messaging/issues/152)). This dedicated type helps with the discovery of the available message handler registration options. + +Following example shows how older versions could register the message handler directly on the `services`, while now they're only available after registering the message pump: + +```diff +- services.AddServiceBusQueueMessagePump(...); +- services.WithServiceBusMessageHandler(); + ++ services.AddServiceBusQueueMessagePump(...) ++ .WithServiceBusMessageHandler(); +``` diff --git a/docs/versioned_sidebars/version-v2.1.0-sidebars.json b/docs/versioned_sidebars/version-v2.1.0-sidebars.json new file mode 100644 index 00000000..caea0c03 --- /dev/null +++ b/docs/versioned_sidebars/version-v2.1.0-sidebars.json @@ -0,0 +1,8 @@ +{ + "tutorialSidebar": [ + { + "type": "autogenerated", + "dirName": "." + } + ] +} diff --git a/docs/versions.json b/docs/versions.json index f97c0c8a..269ed245 100644 --- a/docs/versions.json +++ b/docs/versions.json @@ -1,4 +1,5 @@ [ + "v2.1.0", "v2.0.0", "v1.4.0", "v1.3.0",