diff --git a/src/DatatypeChannels.ASB/Consumer.fs b/src/DatatypeChannels.ASB/Consumer.fs index a9055d6..08241f2 100644 --- a/src/DatatypeChannels.ASB/Consumer.fs +++ b/src/DatatypeChannels.ASB/Consumer.fs @@ -25,8 +25,11 @@ type internal MessageContext = CancellationTokenSource = new CancellationTokenSource() Reciever = reciever } +type internal Options() = + inherit ServiceBusReceiverOptions() + member val IgnoreDuplicates = true with get,set -let mkNew (options:ServiceBusReceiverOptions) +let mkNew (options: Options) (startRenewal: MessageContext -> Task) (OfReceived ofReceived) (withClient: (ServiceBusClient -> _) -> _) @@ -70,12 +73,18 @@ let mkNew (options:ServiceBusReceiverOptions) activity.Dispose() return None } - return msg |> Option.map (fun msg -> + return msg |> Option.bind (fun msg -> let msgCtx = MessageContext.From received activity receiver - if receiver.ReceiveMode = ServiceBusReceiveMode.PeekLock then - if msgCtxs.TryAdd(received.MessageId, msgCtx) then startRenewal msgCtx |> ignore - else failwith "Unable to add the message" - { Msg = msg; Id = received.MessageId }) + match receiver.ReceiveMode with + | ServiceBusReceiveMode.ReceiveAndDelete -> + Some { Msg = msg; Id = received.MessageId } + | ServiceBusReceiveMode.PeekLock when msgCtxs.TryAdd(received.MessageId, msgCtx) -> + startRenewal msgCtx |> ignore + Some { Msg = msg; Id = received.MessageId } + | ServiceBusReceiveMode.PeekLock when not options.IgnoreDuplicates -> + failwith $"Unable to add the message, duplicate id: {received.MessageId}" + | _ -> None + ) } member __.Ack receivedId = diff --git a/src/DatatypeChannels.ASB/DatatypeChannels.fs b/src/DatatypeChannels.ASB/DatatypeChannels.fs index ddbf91a..5891593 100644 --- a/src/DatatypeChannels.ASB/DatatypeChannels.fs +++ b/src/DatatypeChannels.ASB/DatatypeChannels.fs @@ -5,6 +5,15 @@ open System.Threading.Tasks open Azure.Messaging.ServiceBus open Azure.Messaging.ServiceBus.Administration +type ChannelOptions = + { Prefetch: uint16 option // optional prefetch limit + IgnoreDuplicates: bool + TempIdle: TimeSpan } // temporary queue idle lifetime + static member Default = + { Prefetch = None + TempIdle = TimeSpan.FromMinutes 5. + IgnoreDuplicates = true } + [] [] module Channels = @@ -12,13 +21,11 @@ module Channels = /// mkClient: function to construct the client. /// mkAdminClient: function to construct the admin client. /// log: function for diagnostics logging. - /// prefetch: optional prefetch limit. - /// tempIdle: temporary queue idle lifetime. + /// options: channel options. let mkNew (mkClient: unit -> ServiceBusClient) (mkAdminClient: unit -> ServiceBusAdministrationClient) (log: Log) - (prefetch: uint16 option) - (tempIdle: TimeSpan) = + (options: ChannelOptions) = let client = lazy mkClient() let adminClient = lazy mkAdminClient() let withClient cont = cont client.Value @@ -32,23 +39,23 @@ module Channels = let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration Subscription.withBinding log withAdminClient binding, - ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock), + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), renew | Persistent (queueOptions, bindings) -> let renew = Consumer.Renewable.mkNew queueOptions.LockDuration queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration Queue.withBindings log withAdminClient queueOptions bindings, - ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock), + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), renew | Temporary bindings -> - Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = tempIdle)) bindings, - ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete), + Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = options.TempIdle)) bindings, + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete), Consumer.Renewable.noop | DeadLetter path -> (fun cont -> Task.FromResult path |> cont), - ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter), + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter), Consumer.Renewable.noop - prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v) + options.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v) Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings member __.GetPublisher<'msg> toSend (Topic topic) : Publisher<'msg> = @@ -72,12 +79,14 @@ module Channels = /// Build an instance using FQDN of the namespace and Azure TokenCredential let fromFqdn (fqNamespace: string) (credential: Azure.Core.TokenCredential) (log: Log) = - mkNew (fun _ -> ServiceBusClient(fqNamespace, credential)) - (fun _ -> ServiceBusAdministrationClient(fqNamespace, credential)) - log None (TimeSpan.FromMinutes 5.) + ChannelOptions.Default + |> mkNew (fun _ -> ServiceBusClient(fqNamespace, credential)) + (fun _ -> ServiceBusAdministrationClient(fqNamespace, credential)) + log /// Build an instance using connection string let fromConnectionString (connString: string) (log: Log) = - mkNew (fun _ -> ServiceBusClient connString) - (fun _ -> ServiceBusAdministrationClient connString) - log None (TimeSpan.FromMinutes 5.) + ChannelOptions.Default + |> mkNew (fun _ -> ServiceBusClient connString) + (fun _ -> ServiceBusAdministrationClient connString) + log