Skip to content

Commit ffe53a5

Browse files
committed
fix: queue gauge metric
1 parent bb61dd7 commit ffe53a5

File tree

16 files changed

+134
-127
lines changed

16 files changed

+134
-127
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,6 @@ sample:
55

66
sample-clean:
77
@docker compose -f docker-compose.sample.yaml down
8+
9+
benchmark:
10+
dotnet run -c Release --project ./tests/benchmark/Farfetch.LoadShedding.BenchmarkTests/Farfetch.LoadShedding.BenchmarkTests.csproj

src/Farfetch.LoadShedding/Events/Args/ItemDequeuedEventArgs.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,17 @@ namespace Farfetch.LoadShedding.Events.Args
66
/// <summary>
77
/// Event args for the task dequeued event.
88
/// </summary>
9-
public class ItemDequeuedEventArgs : ItemEventArgs
9+
public class ItemDequeuedEventArgs : TaskQueueEventArgs
1010
{
11-
internal ItemDequeuedEventArgs(Priority priority, TimeSpan queueTime, int queueLimit, int queueCount)
12-
: base(priority)
11+
internal ItemDequeuedEventArgs(Priority priority, TimeSpan queueTime, IReadOnlyCounter queueCounter)
12+
: base(priority, queueCounter)
1313
{
1414
this.QueueTime = queueTime;
15-
this.QueueLimit = queueLimit;
16-
this.QueueCount = queueCount;
1715
}
1816

1917
/// <summary>
2018
/// Gets the time waiting in the queue.
2119
/// </summary>
2220
public TimeSpan QueueTime { get; }
23-
24-
/// <summary>
25-
/// Gets the maximum number of items in the queue.
26-
/// </summary>
27-
public int QueueLimit { get; }
28-
29-
/// <summary>
30-
/// Gets the current number of items in the queue.
31-
/// </summary>
32-
public int QueueCount { get; }
3321
}
3422
}

src/Farfetch.LoadShedding/Events/Args/ItemEnqueuedEventArgs.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,11 @@ namespace Farfetch.LoadShedding.Events.Args
55
/// <summary>
66
/// Event args for the task enqueued event.
77
/// </summary>
8-
public class ItemEnqueuedEventArgs : ItemEventArgs
8+
public class ItemEnqueuedEventArgs : TaskQueueEventArgs
99
{
10-
internal ItemEnqueuedEventArgs(Priority priority, int queueLimit, int queueCount)
11-
: base(priority)
10+
internal ItemEnqueuedEventArgs(Priority priority, IReadOnlyCounter queueCounter)
11+
: base(priority, queueCounter)
1212
{
13-
this.QueueLimit = queueLimit;
14-
this.QueueCount = queueCount;
1513
}
16-
17-
/// <summary>
18-
/// Gets the maximum number of items in the queue.
19-
/// </summary>
20-
public int QueueLimit { get; }
21-
22-
/// <summary>
23-
/// Gets the current number of items in the queue.
24-
/// </summary>
25-
public int QueueCount { get; }
2614
}
2715
}
Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,22 @@
1-
using System;
1+
using System;
22
using Farfetch.LoadShedding.Tasks;
33

44
namespace Farfetch.LoadShedding.Events.Args
55
{
66
/// <summary>
77
/// Event args for task processed event.
88
/// </summary>
9-
public class ItemProcessedEventArgs : ItemEventArgs
9+
public class ItemProcessedEventArgs : TaskItemEventArgs
1010
{
11-
internal ItemProcessedEventArgs(Priority priority, TimeSpan processingTime, int concurrencyLimit, int concurrencyCount)
12-
: base(priority)
11+
internal ItemProcessedEventArgs(Priority priority, TimeSpan processingTime, IReadOnlyCounter concurrencyCounter)
12+
: base(priority, concurrencyCounter)
1313
{
1414
this.ProcessingTime = processingTime;
15-
this.ConcurrencyLimit = concurrencyLimit;
16-
this.ConcurrencyCount = concurrencyCount;
1715
}
1816

1917
/// <summary>
2018
/// Gets time spent to process the task.
2119
/// </summary>
2220
public TimeSpan ProcessingTime { get; }
23-
24-
/// <summary>
25-
/// Gets the current concurrency limit.
26-
/// </summary>
27-
public int ConcurrencyLimit { get; }
28-
29-
/// <summary>
30-
/// Gets the current concurrency items count.
31-
/// </summary>
32-
public int ConcurrencyCount { get; }
3321
}
3422
}
Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,15 @@
1-
using Farfetch.LoadShedding.Tasks;
1+
using Farfetch.LoadShedding.Tasks;
22

33
namespace Farfetch.LoadShedding.Events.Args
44
{
55
/// <summary>
6-
/// Event args for task procssing event.
6+
/// Event args for task processing event.
77
/// </summary>
8-
public class ItemProcessingEventArgs : ItemEventArgs
8+
public class ItemProcessingEventArgs : TaskItemEventArgs
99
{
10-
internal ItemProcessingEventArgs(Priority priority, int concurrencyLimit, int concurrencyCount)
11-
: base(priority)
10+
internal ItemProcessingEventArgs(Priority priority, IReadOnlyCounter concurrencyCounter)
11+
: base(priority, concurrencyCounter)
1212
{
13-
this.ConcurrencyLimit = concurrencyLimit;
14-
this.ConcurrencyCount = concurrencyCount;
1513
}
16-
17-
/// <summary>
18-
/// Gets the current concurrency limit.
19-
/// </summary>
20-
public int ConcurrencyLimit { get; }
21-
22-
/// <summary>
23-
/// Gets the current concurrency items count.
24-
/// </summary>
25-
public int ConcurrencyCount { get; }
2614
}
2715
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Farfetch.LoadShedding.Tasks;
2+
3+
namespace Farfetch.LoadShedding.Events.Args
4+
{
5+
/// <summary>
6+
/// Event args for task item event.
7+
/// </summary>
8+
public class TaskItemEventArgs : ItemEventArgs
9+
{
10+
private readonly IReadOnlyCounter _concurrencyCounter;
11+
12+
internal TaskItemEventArgs(Priority priority, IReadOnlyCounter concurrencyCounter)
13+
: base(priority)
14+
{
15+
_concurrencyCounter = concurrencyCounter;
16+
}
17+
18+
/// <summary>
19+
/// Gets the current concurrency limit.
20+
/// </summary>
21+
public int ConcurrencyLimit => _concurrencyCounter.Limit;
22+
23+
/// <summary>
24+
/// Gets the current concurrency items count.
25+
/// </summary>
26+
public int ConcurrencyCount => _concurrencyCounter.Count;
27+
}
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Farfetch.LoadShedding.Tasks;
2+
3+
namespace Farfetch.LoadShedding.Events.Args
4+
{
5+
/// <summary>
6+
/// Event args for the task queue event.
7+
/// </summary>
8+
public class TaskQueueEventArgs : ItemEventArgs
9+
{
10+
private readonly IReadOnlyCounter _queueCounter;
11+
12+
internal TaskQueueEventArgs(Priority priority, IReadOnlyCounter queueCounter)
13+
: base(priority)
14+
{
15+
this._queueCounter = queueCounter;
16+
}
17+
18+
/// <summary>
19+
/// Gets the maximum number of items in the queue.
20+
/// </summary>
21+
public int QueueLimit => _queueCounter.Limit;
22+
23+
/// <summary>
24+
/// Gets the current number of items in the queue.
25+
/// </summary>
26+
public int QueueCount => _queueCounter.Count;
27+
}
28+
}

src/Farfetch.LoadShedding/Tasks/ConcurrentCounter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
namespace Farfetch.LoadShedding.Tasks
22
{
3-
internal class ConcurrentCounter
3+
internal class ConcurrentCounter : IReadOnlyCounter
44
{
5-
private readonly object _locker = new object();
5+
private readonly object _locker = new();
66

77
private int _count = 0;
88

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace Farfetch.LoadShedding.Tasks
2+
{
3+
internal interface IReadOnlyCounter
4+
{
5+
int Count { get; }
6+
7+
int Limit { get; }
8+
}
9+
}

src/Farfetch.LoadShedding/Tasks/Priority.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ namespace Farfetch.LoadShedding.Tasks
66
public enum Priority
77
{
88
/// <summary>
9-
/// Priority as critical.
9+
/// Priority as Critical.
1010
/// </summary>
1111
Critical = 0,
1212

1313
/// <summary>
14-
/// Priority as normal.
14+
/// Priority as Normal.
1515
/// </summary>
1616
Normal = 1,
1717

1818
/// <summary>
19-
/// Priority as critical.
19+
/// Priority as Non Critical.
2020
/// </summary>
2121
NonCritical = 2,
2222
}

src/Farfetch.LoadShedding/Tasks/TaskManager.cs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
44
using Farfetch.LoadShedding.Constants;
@@ -12,7 +12,7 @@ internal class TaskManager : ITaskManager
1212
{
1313
private readonly TaskQueue _taskQueue;
1414

15-
private readonly ConcurrentCounter _counter = new ConcurrentCounter();
15+
private readonly ConcurrentCounter _counter = new();
1616

1717
private readonly int _queueTimeout;
1818
private readonly ILoadSheddingEvents _events;
@@ -30,8 +30,8 @@ public TaskManager(
3030

3131
this._taskQueue = new TaskQueue(maxQueueSize)
3232
{
33-
OnItemEnqueued = (count, item) => this.NotifyItemEnqueued(count, item),
34-
OnItemDequeued = (count, item) => this.NotifyItemDequeued(count, item),
33+
OnItemEnqueued = this.NotifyItemEnqueued,
34+
OnItemDequeued = this.NotifyItemDequeued,
3535
};
3636

3737
this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit));
@@ -86,11 +86,11 @@ public async Task<TaskItem> AcquireAsync(Priority priority, CancellationToken ca
8686
{
8787
var item = this.CreateTask(priority);
8888

89-
if (this._counter.TryIncrement(out var currentCount))
89+
if (this._counter.TryIncrement(out var _))
9090
{
9191
item.Process();
9292

93-
this.NotifyItemProcessing(item, currentCount);
93+
this.NotifyItemProcessing(item);
9494

9595
return item;
9696
}
@@ -124,7 +124,8 @@ await item
124124
break;
125125
}
126126

127-
this.NotifyItemProcessing(item, this._counter.Increment());
127+
this._counter.Increment();
128+
this.NotifyItemProcessing(item);
128129

129130
return item;
130131
}
@@ -136,10 +137,9 @@ private TaskItem CreateTask(Priority priority)
136137
item.OnCompleted = () =>
137138
{
138139
var count = this._counter.Decrement();
139-
140140
var processNext = count < this._counter.Limit;
141141

142-
this.NotifyItemProcessed(item, count);
142+
this.NotifyItemProcessed(item);
143143

144144
if (processNext)
145145
{
@@ -155,32 +155,28 @@ private void NotifyItemRejected(TaskItem item, string reason)
155155
item.Priority,
156156
reason));
157157

158-
private void NotifyItemProcessed(TaskItem item, int count)
158+
private void NotifyItemProcessed(TaskItem item)
159159
=> this._events?.ItemProcessed?.Raise(new ItemProcessedEventArgs(
160160
item.Priority,
161161
item.ProcessingTime,
162-
this.ConcurrencyLimit,
163-
count));
162+
this._counter));
164163

165-
private void NotifyItemProcessing(TaskItem item, int count)
164+
private void NotifyItemProcessing(TaskItem item)
166165
=> this._events?.ItemProcessing?.Raise(new ItemProcessingEventArgs(
167166
item.Priority,
168-
this.ConcurrencyLimit,
169-
count));
167+
this._counter));
170168

171169
private void NotifyConcurrencyLimitChanged()
172170
=> this._events?.ConcurrencyLimitChanged?.Raise(new LimitChangedEventArgs(this._counter.Limit));
173171

174-
private void NotifyItemDequeued(int count, TaskItem item) => this._events?.ItemDequeued?.Raise(new ItemDequeuedEventArgs(
172+
private void NotifyItemDequeued(TaskItem item) => this._events?.ItemDequeued?.Raise(new ItemDequeuedEventArgs(
175173
item.Priority,
176174
item.WaitingTime,
177-
this._taskQueue.Limit,
178-
count));
175+
this._taskQueue));
179176

180-
private void NotifyItemEnqueued(int count, TaskItem item) => this._events?.ItemEnqueued?.Raise(new ItemEnqueuedEventArgs(
177+
private void NotifyItemEnqueued(TaskItem item) => this._events?.ItemEnqueued?.Raise(new ItemEnqueuedEventArgs(
181178
item.Priority,
182-
this._taskQueue.Limit,
183-
count));
179+
this._taskQueue));
184180

185181
private void ProcessPendingTasks()
186182
{

0 commit comments

Comments
 (0)