Skip to content

Commit

Permalink
add option to ignore duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
et1975 committed Feb 28, 2023
1 parent e59b4be commit 9b62ed7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
21 changes: 15 additions & 6 deletions src/DatatypeChannels.ASB/Consumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ type internal MessageContext =
CancellationTokenSource = new CancellationTokenSource()
Reciever = reciever }

type internal Options() =
inherit ServiceBusReceiverOptions()
member val IgnoreDuplicates = true with get,set

let mkNew (options:ServiceBusReceiverOptions)
let mkNew (options: Options)
(startRenewal: MessageContext -> Task)
(OfReceived ofReceived)
(withClient: (ServiceBusClient -> _) -> _)
Expand Down Expand Up @@ -70,12 +73,18 @@ let mkNew (options:ServiceBusReceiverOptions)
activity.Dispose()
return None
}
return msg |> Option.map (fun msg ->
return msg |> Option.bind (fun msg ->
let msgCtx = MessageContext.From received activity receiver
if receiver.ReceiveMode = ServiceBusReceiveMode.PeekLock then
if msgCtxs.TryAdd(received.MessageId, msgCtx) then startRenewal msgCtx |> ignore
else failwith "Unable to add the message"
{ Msg = msg; Id = received.MessageId })
match receiver.ReceiveMode with
| ServiceBusReceiveMode.ReceiveAndDelete ->
Some { Msg = msg; Id = received.MessageId }
| ServiceBusReceiveMode.PeekLock when msgCtxs.TryAdd(received.MessageId, msgCtx) ->
startRenewal msgCtx |> ignore
Some { Msg = msg; Id = received.MessageId }
| ServiceBusReceiveMode.PeekLock when not options.IgnoreDuplicates ->
failwith $"Unable to add the message, duplicate id: {received.MessageId}"
| _ -> None
)
}

member __.Ack receivedId =
Expand Down
41 changes: 25 additions & 16 deletions src/DatatypeChannels.ASB/DatatypeChannels.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@ open System.Threading.Tasks
open Azure.Messaging.ServiceBus
open Azure.Messaging.ServiceBus.Administration

type ChannelOptions =
{ Prefetch: uint16 option // optional prefetch limit
IgnoreDuplicates: bool
TempIdle: TimeSpan } // temporary queue idle lifetime
static member Default =
{ Prefetch = None
TempIdle = TimeSpan.FromMinutes 5.
IgnoreDuplicates = true }

[<RequireQualifiedAccess>]
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Channels =
/// constructs event-stream publishers and consumers.
/// mkClient: function to construct the client.
/// mkAdminClient: function to construct the admin client.
/// log: function for diagnostics logging.
/// prefetch: optional prefetch limit.
/// tempIdle: temporary queue idle lifetime.
/// options: channel options.
let mkNew (mkClient: unit -> ServiceBusClient)
(mkAdminClient: unit -> ServiceBusAdministrationClient)
(log: Log)
(prefetch: uint16 option)
(tempIdle: TimeSpan) =
(options: ChannelOptions) =
let client = lazy mkClient()
let adminClient = lazy mkAdminClient()
let withClient cont = cont client.Value
Expand All @@ -32,23 +39,23 @@ module Channels =
let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration
binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration
Subscription.withBinding log withAdminClient binding,
ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock),
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Persistent (queueOptions, bindings) ->
let renew = Consumer.Renewable.mkNew queueOptions.LockDuration
queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration
Queue.withBindings log withAdminClient queueOptions bindings,
ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock),
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Temporary bindings ->
Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = tempIdle)) bindings,
ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete),
Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = options.TempIdle)) bindings,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete),
Consumer.Renewable.noop
| DeadLetter path ->
(fun cont -> Task.FromResult path |> cont),
ServiceBusReceiverOptions(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter),
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter),
Consumer.Renewable.noop
prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v)
options.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v)
Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings

member __.GetPublisher<'msg> toSend (Topic topic) : Publisher<'msg> =
Expand All @@ -72,12 +79,14 @@ module Channels =

/// Build an instance using FQDN of the namespace and Azure TokenCredential
let fromFqdn (fqNamespace: string) (credential: Azure.Core.TokenCredential) (log: Log) =
mkNew (fun _ -> ServiceBusClient(fqNamespace, credential))
(fun _ -> ServiceBusAdministrationClient(fqNamespace, credential))
log None (TimeSpan.FromMinutes 5.)
ChannelOptions.Default
|> mkNew (fun _ -> ServiceBusClient(fqNamespace, credential))
(fun _ -> ServiceBusAdministrationClient(fqNamespace, credential))
log

/// Build an instance using connection string
let fromConnectionString (connString: string) (log: Log) =
mkNew (fun _ -> ServiceBusClient connString)
(fun _ -> ServiceBusAdministrationClient connString)
log None (TimeSpan.FromMinutes 5.)
ChannelOptions.Default
|> mkNew (fun _ -> ServiceBusClient connString)
(fun _ -> ServiceBusAdministrationClient connString)
log

0 comments on commit 9b62ed7

Please sign in to comment.