File tree Expand file tree Collapse file tree 2 files changed +11
-7
lines changed
tests/DatatypeChannels.ASB.Tests Expand file tree Collapse file tree 2 files changed +11
-7
lines changed Original file line number Diff line number Diff line change @@ -37,7 +37,9 @@ let mkNew (options:ServiceBusReceiverOptions)
37
37
match ctxRef.Value with
38
38
| Some (( receiver,_,_) as ctx) when not receiver.IsClosed -> Task.FromResult ctx
39
39
| _ -> fun ( name :Task < _ >) -> backgroundTask {
40
+ use activity = activitySource |> Diagnostics.startActivity " new" ActivityKind.Consumer
40
41
let! name = name
42
+ activity.AddTag( " binding" , $" {name}/{options.SubQueue}" ) |> ignore
41
43
let startActivity _ = activitySource |> Diagnostics.startActivity name ActivityKind.Consumer
42
44
let receiver = withClient ( fun client -> client.CreateReceiver( name, options))
43
45
ctxRef.Value <- Some ( receiver, startActivity, ConcurrentDictionary())
@@ -71,9 +73,9 @@ let mkNew (options:ServiceBusReceiverOptions)
71
73
return msg |> Option.map ( fun msg ->
72
74
let msgCtx = MessageContext.From received activity receiver
73
75
if receiver.ReceiveMode = ServiceBusReceiveMode.PeekLock then
74
- if msgCtxs.TryAdd( received.LockToken , msgCtx) then startRenewal msgCtx |> ignore
76
+ if msgCtxs.TryAdd( received.MessageId , msgCtx) then startRenewal msgCtx |> ignore
75
77
else failwith " Unable to add the message"
76
- { Msg = msg; Id = received.LockToken })
78
+ { Msg = msg; Id = received.MessageId })
77
79
}
78
80
79
81
member __.Ack receivedId =
Original file line number Diff line number Diff line change @@ -153,13 +153,15 @@ let tests =
153
153
let! dlc = DeadLetter " renews-queue" |> channels.GetConsumer PlainText.ofReceived
154
154
let publisher = channels.GetPublisher ( PlainText.toSend testId) topic
155
155
do ! Task.Delay 5_000 // majic number - this is how long it takes the backend to start routing messages to this subscription!
156
- let n = 20
156
+ let n = 200
157
157
for i in 1 .. n do
158
158
do ! publisher |> Publisher.publish $" test-payload-{i}"
159
- let xs = ResizeArray()
160
- for i in 1 .. n do
161
- let! received = TimeSpan.FromSeconds 1. |> consumer.Get
162
- received |> Option.iter xs.Add
159
+ let! xs = // also test parallel consumption
160
+ seq { for _ in 1 .. 2 -> seq { for _ in 1 .. n/ 2 -> TimeSpan.FromSeconds 1. |> consumer.Get}}
161
+ |> Seq.map Task.WhenAll
162
+ |> Task.WhenAll
163
+ |> Task.map ( Array.collect id)
164
+ |> Task.map ( Array.choose id)
163
165
xs |> Seq.distinct |> Seq.length =! n
164
166
do ! Task.Delay ( TimeSpan.FromMinutes 7. )
165
167
for received in xs do
You can’t perform that action at this time.
0 commit comments