Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/service logs success while event wasnt success #23

Merged
merged 22 commits into from
Jun 27, 2022

Conversation

aspirio187
Copy link
Collaborator

Bug Fix #13

Description

This bug fix will be simple because the bug itself is simple.

If an event couldn't be treated properly, it is now referred as a Error and it displays the right message. The code is ready for a fail event logic (DLQ)

image

@aspirio187 aspirio187 added the bug Something isn't working label Jun 20, 2022
@aspirio187 aspirio187 requested a review from nbittich June 20, 2022 13:04
Copy link
Contributor

@nbittich nbittich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you consumed the event, you lost it in this case & you no are no longer in sync with the backend.
this is not enough to fix the issue. Maybe check the doc on how to deal with things like that (DLQ, ACK.NOK etc)

@aspirio187
Copy link
Collaborator Author

aspirio187 commented Jun 24, 2022

Update

Description

Failed event redirection

There is now a redirection to a failed queue. If an event failed 5 times or more, it is sent to the DLQ.

The old way using a delegate is obsolete. If I tried to send the failed event to the new queue through de Message_Received() method, the app blocked and couldn't continue its task. So now 2 new tasks are started in the Start() method, one handling the backend-event messages and one handling the dropbox-sync-failed queue messages.

The new BrokerEventListerner.Start() method

public void Start()
        {
            if (AmqpConnection is null) throw new NullReferenceException(nameof(AmqpConnection));

            try
            {
                _session = ((IConnection)AmqpConnection).CreateSession();
                ReceiverLink receiverLink = new ReceiverLink(_session as Session, "", _amqpCredentials.AmqpQueue);

                ReceiverTaskCancellationTokenSource = new CancellationTokenSource();
                CancellationToken token = ReceiverTaskCancellationTokenSource.Token;

                Task.Factory.StartNew(
                    () => ReceiveMessage(receiverLink, token), token);

                Task.Factory.StartNew(async () => await FailedQueueMonitoringAsync());

                // receiverLink.Start(200, Message_Received);
                _logger.LogInformation("{date} | Listening on AMQP", DateTime.Now);
            }
            catch (AmqpException e)
            {
                _logger.LogError("{date} | Couldn't create AMQP Session : {ex}", DateTime.Now, e.Message);
            }
            catch (Exception e)
            {
                _logger.LogError("{date} | An error occured while trying to create a session to the broker : {ex}",
                    DateTime.Now, e.Message);
            }
        }

BrokerEventListenenr.FailedQueueMonitoringAsync

        /// <summary>
        /// Execute a timer that will trigger every 5 minutes and redirect to <see cref="CheckFailedQueue"/> method
        /// </summary>
        public async Task FailedQueueMonitoringAsync()
        {
            using var timer = new PeriodicTimer(TimeSpan.FromMinutes(10));

            while (await timer.WaitForNextTickAsync())
            {
                _logger.LogInformation("{date} | Checking the failed queue", DateTime.Now);
                CheckFailedQueue();
            }
        }

Every 10 minute, the service will check the dropbox-sync-failed queue, process its messages and once its done, wait for the second timer call

BrokerEventListener.CheckFailedQueue()

        /// <summary>
        /// Monitor the <c>dropbox-sync-failed</c> queue and treat every messages. If no message is received after
        /// 30 seconds, the listening process stop and it starts to process received messages. The listener is closed
        /// at the end
        /// </summary>
        public void CheckFailedQueue()
        {
            if (_session is null) throw new NullReferenceException(nameof(_session));

            _logger.LogInformation("{date} | Fail queue check started", DateTime.Now);

            List<Message> messages = new List<Message>();

            ReceiverLink? receiverLink = _session.CreateReceiver("receiver" + Guid.NewGuid(), new Source()
            {
                Address = "dropbox-sync-failed",
                Capabilities = new[]
                {
                    new Symbol("queue")
                },
                Durable = 1
            }) as ReceiverLink;

            if (receiverLink is null) throw new NullValueException(nameof(receiverLink));

            while (true)
            {
                Message? message = receiverLink.Receive(TimeSpan.FromSeconds(15));
                if (message is null)
                {
                    _logger.LogWarning("{date} | There is no more messages in failed queue!",
                        DateTime.Now);
                    break;
                }

                _logger.LogInformation("{date} | Message received from failed queue : {msg}",
                    DateTime.Now, Encoding.UTF8.GetString((byte[])message.Body));

                messages.Add(message);
            }

            foreach (Message message in messages)
            {
                if (message is null || message.Body is null)
                {
                    receiverLink.Reject(message);
                    continue;
                }

                FailedEventModel? failedEvent =
                    JsonConvert.DeserializeObject<FailedEventModel>(Encoding.UTF8.GetString((byte[])message.Body));

                if (failedEvent is null) continue;
                if (string.IsNullOrEmpty(failedEvent.MessageJson)) continue;

                EventModel? eventModel = JsonConvert.DeserializeObject<EventModel>(failedEvent.MessageJson);

                if (eventModel is null)
                {
                    receiverLink.Reject(message);
                    continue;
                }

                if (!Enum.TryParse(typeof(BrokerEvent), eventModel.EventName, out object? brokerEventParseResult))
                {
                    receiverLink.Reject(message);
                    continue;
                }

                if (brokerEventParseResult is null)
                {
                    receiverLink.Reject(message);
                    continue;
                }

                BrokerEvent eventType = (BrokerEvent)brokerEventParseResult;

                // bool redirectionResult = Task.Run<bool>(() => EventRedirection(eventType, failedEvent.MessageJson)).Result;
                bool redirectionResult = Task.Run<bool>(() =>
                    _eventManagerLocator.RedirectToManager(failedEvent.MessageJson)).Result;

                if (redirectionResult)
                {
                    _logger.LogInformation("{date} | Failed message successfully treated : \"{msg}\"",
                        DateTime.Now, failedEvent.MessageJson);

                    receiverLink.Accept(message);
                }
                else
                {
                    if (failedEvent.Attempt >= 5)
                    {
                        _logger.LogInformation("{date} | Failed message reached 5 attempts and is sent to DLQ : {msg}",
                            DateTime.Now, failedEvent.MessageJson);
                        receiverLink.Reject(message);
                    }
                    else
                    {
                        failedEvent.Attempt++;
                        Message newMessage = new Message()
                        {
                            BodySection = new Data()
                            {
                                Binary = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(failedEvent))
                            }
                        };

                        if (_session is null) throw new NullReferenceException(nameof(_session));

                        ISenderLink sender = _session.CreateSender("sender" + Guid.NewGuid(), new Target
                        {
                            Address = "dropbox-sync-failed",
                            Capabilities = new[]
                            {
                                new Symbol("queue")
                            },
                            Durable = 1
                        });

                        sender.Send(newMessage);

                        _logger.LogInformation("{date} | Message resent to failed queue : {msg}",
                            DateTime.Now, failedEvent.MessageJson);

                        sender.Close();

                        receiverLink.Accept(message);
                    }
                }
            }

            receiverLink.Close();
        }

Event's manager locator

To clean the code a little bit, a locator that locates the right manager and the right method has been implemented. This last part doesn't log if any error happens because they depends on the programmer and not the Broker. If an error occur in the code, it is the developer's fault and he must fix the bug quicly

public bool RedirectToManager(string eventJson)
        {
            if (string.IsNullOrEmpty(eventJson)) throw new ArgumentNullException(nameof(eventJson));

            List<string> managers = new List<string>();

            foreach (Type type in Assembly.GetExecutingAssembly().GetTypes())
            {
                if (!string.IsNullOrEmpty(type.Namespace))
                {
                    if (type.Namespace.EndsWith("Managers") &&
                        type.IsClass &&
                        type.Name.Contains("Manager"))
                    {
                        managers.Add(type.Name.Substring(0, type.Name.Length - "Manager".Length));
                    }
                }
            }

            EventModel? eventModel = JsonConvert.DeserializeObject<EventModel>(eventJson);

            if (eventModel is null) throw new NullValueException(nameof(eventModel));

            Type? eventManagerType = null;

            foreach (string manager in managers)
            {
                eventManagerType = Assembly.GetExecutingAssembly().GetTypes()
                    .SingleOrDefault(t => t.IsInterface && t.Name.Contains(manager + "Manager"));

                if (eventManagerType is null) continue;

                object? managerService = Program.Host?.Services.GetRequiredService(eventManagerType);

                if (managerService is null) throw new NullValueException(nameof(managerService));

                foreach (MemberInfo method in eventManagerType.GetMembers())
                {
                    MethodEventAttribute? attribute = (MethodEventAttribute?)method.GetCustomAttribute(typeof(MethodEventAttribute));

                    if (attribute is null) continue;

                    if (attribute.EventName.Equals(eventModel.EventName))
                    {
                        Type obj = attribute.EventType;

                        string methodName = method.Name;
                        MethodInfo? methodInfo = eventManagerType.GetMethod(methodName);

                        if (methodInfo is null) throw new NullValueException(nameof(methodInfo));

                        object? deserializedObject = JsonConvert.DeserializeObject(eventJson, attribute.EventType);

                        if (deserializedObject is null) throw new NullValueException(nameof(deserializedObject));

                        if (methodInfo.IsGenericMethod)
                        {
                            MethodInfo? genericMethodInfo = methodInfo.MakeGenericMethod(new[] { deserializedObject.GetType() });

                            if (genericMethodInfo is null) throw new NullValueException(nameof(genericMethodInfo));

                            bool? result = (bool?)genericMethodInfo.Invoke(managerService, new[] { deserializedObject });

                            if (result is null) throw new NullValueException(nameof(result));

                            return (bool)result;
                        }
                        else
                        {
                            bool? result = (bool?)methodInfo.Invoke(managerService, new[] { deserializedObject });

                            if (result is null) throw new NullValueException(nameof(result));

                            return (bool)result;
                        }
                    }
                }
            }

If a new event is created for Dossier for example, the new method in IDossierManager interface must precede with a custom attribute

MethodEventAttribute class

public class MethodEventAttribute : Attribute
    {
        public Type EventType { get; private set; }
        public string EventName { get; private set; }

        public MethodEventAttribute(Type eventType, string eventName)
        {
            if (string.IsNullOrEmpty(eventName)) throw new ArgumentNullException(nameof(eventName));

            EventType = eventType ??
                throw new ArgumentNullException(nameof(eventType));
            EventName = eventName;
        }
    }

The new IDossierManager

public interface IDossierManager : IEventManager
    {
        [MethodEvent(typeof(DossierCreateModel), nameof(BrokerEvent.DossierCreated))]

        new bool Create<T>(T entity) where T : DossierCreateModel;

        [MethodEvent(typeof(DossierCloseModel), nameof(BrokerEvent.DossierClosed))]
        bool CloseDossier(DossierCloseModel model);

        [MethodEvent(typeof(DossierDeleteModel), nameof(BrokerEvent.DossierDeleted))]
        new bool Delete<T>(T entity) where T : DossierDeleteModel;

        [MethodEvent(typeof(DossierExpensesAddedModel), nameof(BrokerEvent.ExpensesAddedToDossier))]
        bool AddExpense(DossierExpensesAddedModel model);

        [MethodEvent(typeof(DossierExpenseRemovedModel), nameof(BrokerEvent.ExpenseRemovedFromDossier))]
        bool RemoveExpense(DossierExpenseRemovedModel model);

        [MethodEvent(typeof(DossierInvoiceAddedModel), nameof(BrokerEvent.InvoiceAddedToDossier))]
        bool AddInvoice(DossierInvoiceAddedModel model);

        [MethodEvent(typeof(DossierInvoiceRemovedModel), nameof(BrokerEvent.InvoiceRemovedFromDossier))]
        bool RemoveInvoice(DossierInvoiceRemovedModel model);

        [MethodEvent(typeof(DossierRecallForModificationModel), nameof(BrokerEvent.DossierRecallForModification))]
        bool Recall(DossierRecallForModificationModel model);

        [MethodEvent(typeof(DossierUpdateModel), nameof(BrokerEvent.DossierUpdated))]
        new bool Update<T>(T entity) where T : DossierUpdateModel;
    }

@@ -229,7 +249,7 @@ private void ReceiveMessage(ReceiverLink receiver, CancellationToken cancellatio
/// </summary>
public async Task FailedQueueMonitoringAsync()
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(15));
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be an env variable e.g FAILED_QUEUE_MONITORING_TIMER

@@ -9,14 +13,32 @@ namespace DropboxSync.UIL.Managers
{
public interface IDossierManager : IEventManager
{
[MethodEvent(typeof(DossierCreateModel), nameof(BrokerEvent.DossierCreated))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks cleaner indeed!

@aspirio187 aspirio187 merged commit 8c74b3a into develop Jun 27, 2022
@aspirio187 aspirio187 deleted the bug/service-logs-success-while-event-wasnt-success branch June 27, 2022 06:22
nbittich pushed a commit that referenced this pull request Jul 18, 2022
* return logger error if event couldn't be treated

* Added event acknowledgement and event redirection to DLQ

* Add Error handling comments

* Create mail helper class. Extracted broker reconnection in class

* added thread sleep (BrokerEvent) added default values

* add default values

* add equals and hash methods

* add equals and hashcode methods

* Added equals and hashcode method

* added equals and hashcode methods

* added equals and hashcode methods

* Created Iterator and model EventAttempt

* Send failed events to a new queue

* fixed using error

* Renamed connection_closed method

* Implemented tasks for backend-event and failed queues

* Implemented Event's Manager and method locator

* cleaned the code

* added MethodEvent attribute to each method of managers

* cleaned code

* created exception and retrieve timer's minutes from env
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

the service logs success while event wasn't processed successfully
2 participants