Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use message receiver io processor to correctly stop message pump during circuit breaker #444

Merged
merged 49 commits into from
Dec 13, 2024

Conversation

stijnmoreels
Copy link
Member

@stijnmoreels stijnmoreels commented Jun 27, 2024

The ServiceBusProcessor was not able to be stopped within its message processing - this PR uses the ServiceBusReceiver fully with a more recent Service Bus Messaging package.

@stijnmoreels stijnmoreels requested a review from fgheysels as a code owner June 27, 2024 07:05
Copy link

netlify bot commented Jun 27, 2024

Deploy Preview for arcus-messaging canceled.

Name Link
🔨 Latest commit d5cfabf
🔍 Latest deploy log https://app.netlify.com/sites/arcus-messaging/deploys/675a78670f756d0008e38562

@stijnmoreels stijnmoreels marked this pull request as draft July 18, 2024 06:42
@stijnmoreels stijnmoreels marked this pull request as ready for review August 27, 2024 05:28
@fgheysels
Copy link
Member

fgheysels commented Nov 6, 2024

I'm testing this a bit further and I'm getting good results; however, I now run into an issue where an exception was thrown for some reason (no idea what happened).
The exception was thrown, and after that exception, some other traces were logged saying that the received message was 'null'.
And afterwards, no new messages were read from ServiceBus by the MessagePump.

[23:02:26 ERR] Azure Service Bus Queue message pump 'a885a71c-74e8-4325-a681-e9fccbf1a9f6' on entity path 'orders' in namespace 'fg-sb-arcusdemo.servicebus.windows.net' failed to process single message during half-open circuit, retrying after circuit delay
Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:4cc34e78-a2da-47ab-9963-c4e1e30659e8, TrackingId:f6a8ee520000004d01ba96c6672be710_G7_B18, SystemTracker:gi::G7:720044332:amqps://fg-sb-arcusdemo.servicebus.windows.net/-e1a11b51;0:5:6:source(address:/orders,filter:[]), bi::in-connection50454(G7-38509)::session50457::link29005510, Timestamp:2024-11-06T22:02:26 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary`2 propertiesToModify, String deadLetterReason, String deadLetterDescription)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
   at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
   at Arcus.Messaging.Pumps.ServiceBus.AzureServiceBusMessagePump.TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)

After this exception, I can find these traces in the logs:

[23:33:20 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:25 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping
[23:33:26 WRN] Received message on Azure Service Bus Queue message pump '079f0fc1-5eab-487c-81c1-20e0f31b3f8b' was null, skipping

When I inspect the ServiceBus queue that I'm consuming, there are no messages left in the queue.
When I add a few more messages, those messages are not being retrieved anymore. I have to manually restart my application before the MessagePump starts reading messages again.

And I can like keep on reproducing that; once the last message has been processed, this exception saying that the messge lock is expired is thrown.

On another attempt, when all messages were processed and new messages were sent to the queue, the MessagePump retrieved one message.
The first message was retrieved from the queue and was processed; processing went fine and an attempt was made to resume normal processing, however, nothing happened:

these are my logs:

[23:37:27 INF] Process Order eabb888e-afde-49ed-8ac9-f389f8e19fe9
[23:37:27 INF] Start processing HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Sending HTTP request GET http://localhost:4068/api/product/2
[23:37:27 INF] Received HTTP response headers after 19.6146ms - 200
[23:37:27 INF] End processing HTTP request after 37.297ms - 200
[23:37:27 INF] Ordering 9 items of Product B
[23:37:27 WRN] Attempting to resume message-pump
[23:37:27 WRN] {"EventName": "MessagePump.Resumed", "Context": {"TelemetryType": "Events"}, "$type": "EventLogEntry"}
[23:37:27 WRN] {"RequestMethod": "<not-applicable>", "RequestHost": "<not-applicable>", "RequestUri": "<not-applicable>", "ResponseStatusCode": 200, "RequestDuration": "00:00:00.0732313", "RequestTime": "2024-11-06T22:37:27.7335884 +00:00", "SourceSystem": "AzureServiceBus", "CustomRequestSource": null, "OperationName": "Process", "Context": {"ServiceBus-Endpoint": "fg-sb-arcusdemo.servicebus.windows.net", "ServiceBus-Entity": "orders", "ServiceBus-EntityType": "Queue", "TelemetryType": "Request"}, "$type": "RequestLogEntry"}

But afterwards, nothing happened and the messages that were in the queue , remained there. They were not being retrieved.
(The 'Attempting to result message-pump' trace and the custom event 'MessagePump.Resumed' are custom traces that I'm writing via the code below)

_logger.LogWarning("Attempting to resume message-pump");
await _messagePumpCircuitBreaker.ResumeMessageProcessingAsync(messagePumpJobId);
_logger.LogCustomEvent("MessagePump.Resumed");

fgheysels and others added 8 commits November 25, 2024 11:16
* processing spike

* code cleanup

* pr-sug: use message processing result io boolean

* pr-sug: promote circuit breaker state enum to class

* pr-fix: throw-if-null is not available in net-standard

* pr-fix: correct usings in az service bus message pump

* pr-sug: add message id context to the processing result

* pr-fix: correct time-out for resiliency tests

* pr-fix: remove useless dev-test

* pr-fix: correct recieved message creation in unit tests

* pr-fix: more stable post-assertion resilence

* pr-fix: use back the message id for the message processing result

* pr-sug: finishing touches on circuit breaker state transitioning

* pr-fix: streamline equalization in circuit breaker state

* pr-fix: let router abbandon message io circuit breaker handler

* pr-sug: rename wait interval method + fix wait recovery period log

* pr-fix: complete renaming in message pump

* pr-sug: use transition method for open state

* pr-sug: add half-open state boolean flag

* pr-fix: limit processing of single message on queue

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <[email protected]>

* pr-sug: reframe summary and remarks wording in circuit breaker states

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <[email protected]>

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <[email protected]>

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

---------

Co-authored-by: stijnmoreels <[email protected]>
Copy link
Member

@fgheysels fgheysels left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a long journey, and a lot of work has been done to make this work.
Thanks for your perseverance, I think we can now merge these changes :)

@fgheysels fgheysels merged commit 0c93d3c into main Dec 13, 2024
14 checks passed
@fgheysels fgheysels deleted the frgh/fix/433_unable_to_stop_processor branch December 13, 2024 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants