Skip to content

Commit

Permalink
fix: queue gauge metric
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagodaraujo committed Mar 27, 2024
1 parent bb61dd7 commit c624a11
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 127 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ sample:

sample-clean:
@docker compose -f docker-compose.sample.yaml down

benchmark:
dotnet run -c Release --project ./tests/benchmark/Farfetch.LoadShedding.BenchmarkTests/Farfetch.LoadShedding.BenchmarkTests.csproj
18 changes: 3 additions & 15 deletions src/Farfetch.LoadShedding/Events/Args/ItemDequeuedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,17 @@ namespace Farfetch.LoadShedding.Events.Args
/// <summary>
/// Event args for the task dequeued event.
/// </summary>
public class ItemDequeuedEventArgs : ItemEventArgs
public class ItemDequeuedEventArgs : TaskQueueEventArgs
{
internal ItemDequeuedEventArgs(Priority priority, TimeSpan queueTime, int queueLimit, int queueCount)
: base(priority)
internal ItemDequeuedEventArgs(Priority priority, TimeSpan queueTime, IReadOnlyCounter queueCounter)
: base(priority, queueCounter)
{
this.QueueTime = queueTime;
this.QueueLimit = queueLimit;
this.QueueCount = queueCount;
}

/// <summary>
/// Gets the time waiting in the queue.
/// </summary>
public TimeSpan QueueTime { get; }

/// <summary>
/// Gets the maximum number of items in the queue.
/// </summary>
public int QueueLimit { get; }

/// <summary>
/// Gets the current number of items in the queue.
/// </summary>
public int QueueCount { get; }
}
}
18 changes: 3 additions & 15 deletions src/Farfetch.LoadShedding/Events/Args/ItemEnqueuedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,11 @@ namespace Farfetch.LoadShedding.Events.Args
/// <summary>
/// Event args for the task enqueued event.
/// </summary>
public class ItemEnqueuedEventArgs : ItemEventArgs
public class ItemEnqueuedEventArgs : TaskQueueEventArgs
{
internal ItemEnqueuedEventArgs(Priority priority, int queueLimit, int queueCount)
: base(priority)
internal ItemEnqueuedEventArgs(Priority priority, IReadOnlyCounter queueCounter)
: base(priority, queueCounter)
{
this.QueueLimit = queueLimit;
this.QueueCount = queueCount;
}

/// <summary>
/// Gets the maximum number of items in the queue.
/// </summary>
public int QueueLimit { get; }

/// <summary>
/// Gets the current number of items in the queue.
/// </summary>
public int QueueCount { get; }
}
}
20 changes: 4 additions & 16 deletions src/Farfetch.LoadShedding/Events/Args/ItemProcessedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
using System;
using System;
using Farfetch.LoadShedding.Tasks;

namespace Farfetch.LoadShedding.Events.Args
{
/// <summary>
/// Event args for task processed event.
/// </summary>
public class ItemProcessedEventArgs : ItemEventArgs
public class ItemProcessedEventArgs : TaskItemEventArgs
{
internal ItemProcessedEventArgs(Priority priority, TimeSpan processingTime, int concurrencyLimit, int concurrencyCount)
: base(priority)
internal ItemProcessedEventArgs(Priority priority, TimeSpan processingTime, IReadOnlyCounter concorrencyCounter)
: base(priority, concorrencyCounter)
{
this.ProcessingTime = processingTime;
this.ConcurrencyLimit = concurrencyLimit;
this.ConcurrencyCount = concurrencyCount;
}

/// <summary>
/// Gets time spent to process the task.
/// </summary>
public TimeSpan ProcessingTime { get; }

/// <summary>
/// Gets the current concurrency limit.
/// </summary>
public int ConcurrencyLimit { get; }

/// <summary>
/// Gets the current concurrency items count.
/// </summary>
public int ConcurrencyCount { get; }
}
}
22 changes: 5 additions & 17 deletions src/Farfetch.LoadShedding/Events/Args/ItemProcessingEventArgs.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
using Farfetch.LoadShedding.Tasks;
using Farfetch.LoadShedding.Tasks;

namespace Farfetch.LoadShedding.Events.Args
{
/// <summary>
/// Event args for task procssing event.
/// Event args for task processing event.
/// </summary>
public class ItemProcessingEventArgs : ItemEventArgs
public class ItemProcessingEventArgs : TaskItemEventArgs
{
internal ItemProcessingEventArgs(Priority priority, int concurrencyLimit, int concurrencyCount)
: base(priority)
internal ItemProcessingEventArgs(Priority priority, IReadOnlyCounter concorrencyCounter)
: base(priority, concorrencyCounter)
{
this.ConcurrencyLimit = concurrencyLimit;
this.ConcurrencyCount = concurrencyCount;
}

/// <summary>
/// Gets the current concurrency limit.
/// </summary>
public int ConcurrencyLimit { get; }

/// <summary>
/// Gets the current concurrency items count.
/// </summary>
public int ConcurrencyCount { get; }
}
}
28 changes: 28 additions & 0 deletions src/Farfetch.LoadShedding/Events/Args/TaskItemEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Farfetch.LoadShedding.Tasks;

namespace Farfetch.LoadShedding.Events.Args
{
/// <summary>
/// Event args for task item event.
/// </summary>
public class TaskItemEventArgs : ItemEventArgs
{
private readonly IReadOnlyCounter _concurrencyCounter;

internal TaskItemEventArgs(Priority priority, IReadOnlyCounter concurrencyCounter)
: base(priority)
{
_concurrencyCounter = concurrencyCounter;
}

/// <summary>
/// Gets the current concurrency limit.
/// </summary>
public int ConcurrencyLimit => _concurrencyCounter.Limit;

/// <summary>
/// Gets the current concurrency items count.
/// </summary>
public int ConcurrencyCount => _concurrencyCounter.Count;
}
}
28 changes: 28 additions & 0 deletions src/Farfetch.LoadShedding/Events/Args/TaskQueueEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Farfetch.LoadShedding.Tasks;

namespace Farfetch.LoadShedding.Events.Args
{
/// <summary>
/// Event args for the task queue event.
/// </summary>
public class TaskQueueEventArgs : ItemEventArgs
{
private readonly IReadOnlyCounter _queueCounter;

internal TaskQueueEventArgs(Priority priority, IReadOnlyCounter queueCounter)
: base(priority)
{
this._queueCounter = queueCounter;
}

/// <summary>
/// Gets the maximum number of items in the queue.
/// </summary>
public int QueueLimit => _queueCounter.Limit;

/// <summary>
/// Gets the current number of items in the queue.
/// </summary>
public int QueueCount => _queueCounter.Count;
}
}
4 changes: 2 additions & 2 deletions src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace Farfetch.LoadShedding.Tasks
{
internal class ConcurrentCounter
internal class ConcurrentCounter : IReadOnlyCounter
{
private readonly object _locker = new object();
private readonly object _locker = new();

private int _count = 0;

Expand Down
9 changes: 9 additions & 0 deletions src/Farfetch.LoadShedding/Tasks/IReadOnlyCounter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Farfetch.LoadShedding.Tasks
{
internal interface IReadOnlyCounter
{
int Count { get; }

int Limit { get; }
}
}
6 changes: 3 additions & 3 deletions src/Farfetch.LoadShedding/Tasks/Priority.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ namespace Farfetch.LoadShedding.Tasks
public enum Priority
{
/// <summary>
/// Priority as critical.
/// Priority as Critical.
/// </summary>
Critical = 0,

/// <summary>
/// Priority as normal.
/// Priority as Normal.
/// </summary>
Normal = 1,

/// <summary>
/// Priority as critical.
/// Priority as Non Critical.
/// </summary>
NonCritical = 2,
}
Expand Down
38 changes: 17 additions & 21 deletions src/Farfetch.LoadShedding/Tasks/TaskManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using Farfetch.LoadShedding.Constants;
Expand All @@ -12,7 +12,7 @@ internal class TaskManager : ITaskManager
{
private readonly TaskQueue _taskQueue;

private readonly ConcurrentCounter _counter = new ConcurrentCounter();
private readonly ConcurrentCounter _counter = new();

private readonly int _queueTimeout;
private readonly ILoadSheddingEvents _events;
Expand All @@ -30,8 +30,8 @@ public TaskManager(

this._taskQueue = new TaskQueue(maxQueueSize)
{
OnItemEnqueued = (count, item) => this.NotifyItemEnqueued(count, item),
OnItemDequeued = (count, item) => this.NotifyItemDequeued(count, item),
OnItemEnqueued = this.NotifyItemEnqueued,
OnItemDequeued = this.NotifyItemDequeued,
};

this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit));
Expand Down Expand Up @@ -86,11 +86,11 @@ public async Task<TaskItem> AcquireAsync(Priority priority, CancellationToken ca
{
var item = this.CreateTask(priority);

if (this._counter.TryIncrement(out var currentCount))
if (this._counter.TryIncrement(out var _))
{
item.Process();

this.NotifyItemProcessing(item, currentCount);
this.NotifyItemProcessing(item);

return item;
}
Expand Down Expand Up @@ -124,7 +124,8 @@ await item
break;
}

this.NotifyItemProcessing(item, this._counter.Increment());
this._counter.Increment();
this.NotifyItemProcessing(item);

return item;
}
Expand All @@ -136,10 +137,9 @@ private TaskItem CreateTask(Priority priority)
item.OnCompleted = () =>
{
var count = this._counter.Decrement();

var processNext = count < this._counter.Limit;

this.NotifyItemProcessed(item, count);
this.NotifyItemProcessed(item);

if (processNext)
{
Expand All @@ -155,32 +155,28 @@ private void NotifyItemRejected(TaskItem item, string reason)
item.Priority,
reason));

private void NotifyItemProcessed(TaskItem item, int count)
private void NotifyItemProcessed(TaskItem item)
=> this._events?.ItemProcessed?.Raise(new ItemProcessedEventArgs(
item.Priority,
item.ProcessingTime,
this.ConcurrencyLimit,
count));
this._counter));

private void NotifyItemProcessing(TaskItem item, int count)
private void NotifyItemProcessing(TaskItem item)
=> this._events?.ItemProcessing?.Raise(new ItemProcessingEventArgs(
item.Priority,
this.ConcurrencyLimit,
count));
this._counter));

private void NotifyConcurrencyLimitChanged()
=> this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit));

private void NotifyItemDequeued(int count, TaskItem item) => this._events?.ItemDequeued?.Raise(new ItemDequeuedEventArgs(
private void NotifyItemDequeued(TaskItem item) => this._events?.ItemDequeued?.Raise(new ItemDequeuedEventArgs(
item.Priority,
item.WaitingTime,
this._taskQueue.Limit,
count));
this._taskQueue));

private void NotifyItemEnqueued(int count, TaskItem item) => this._events?.ItemEnqueued?.Raise(new ItemEnqueuedEventArgs(
private void NotifyItemEnqueued(TaskItem item) => this._events?.ItemEnqueued?.Raise(new ItemEnqueuedEventArgs(
item.Priority,
this._taskQueue.Limit,
count));
this._taskQueue));

private void ProcessPendingTasks()
{
Expand Down
Loading

0 comments on commit c624a11

Please sign in to comment.