Skip to content

Commit 5eda310

Browse files
committed
Add cancellation token support
1 parent 8244da4 commit 5eda310

10 files changed

+244
-119
lines changed

Gofer.NET.Tests/GivenARedisTaskQueue.cs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
using System.Linq.Expressions;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
89
using FluentAssertions;
10+
911
using Gofer.NET.Utils;
12+
1013
using Xunit;
1114

1215
namespace Gofer.NET.Tests
@@ -22,8 +25,8 @@ public override string ToString()
2225
return Value;
2326
}
2427
}
25-
26-
private class CustomException : Exception {}
28+
29+
private class CustomException : Exception { }
2730

2831
[Fact]
2932
public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
@@ -40,13 +43,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
4043
Tuple.Create<Expression<Action>, string>(
4144
actionExp,
4245
str);
43-
46+
4447
// Action to expected result
4548
var delgates = new Tuple<Expression<Action>, string>[]
4649
{
4750
// Exception Argument
4851
TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()),
4952
TC(() => ExceptionFunc(new CustomException(), semaphoreFile), new CustomException().ToString()),
53+
54+
// Cancelation Argument
55+
TC(() => CancellationFunc(default, semaphoreFile), new CancellationToken().ToString()),
5056

5157
// Integer Arguments
5258
TC(() => IntFunc(int.MaxValue, semaphoreFile), int.MaxValue.ToString()),
@@ -99,29 +105,29 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
99105
TC(() => AsyncFunc(semaphoreFile).T(), "async"),
100106
TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async")
101107
};
102-
108+
103109

104110
foreach (var tup in delgates)
105111
{
106112
var actionExpr = tup.Item1;
107113
var expectedString = tup.Item2;
108-
114+
109115
File.Delete(semaphoreFile);
110-
111-
await testFixture.TaskQueue.Enqueue(actionExpr);
116+
117+
await testFixture.TaskQueue.Enqueue(actionExpr);
112118
await testFixture.TaskQueue.ExecuteNext();
113119

114120
File.ReadAllText(semaphoreFile).Should().Be(expectedString);
115121
}
116-
122+
117123
File.Delete(semaphoreFile);
118124
}
119-
125+
120126
[Fact]
121127
public async Task ItEnqueuesAndReceivesDelegatesThatAreRunnable()
122128
{
123129
var testFixture = new TaskQueueTestFixture(nameof(ItEnqueuesAndReceivesDelegatesThatAreRunnable));
124-
130+
125131
testFixture.EnsureSemaphoreDoesntExist();
126132
await testFixture.PushPopExecuteWriteSemaphore();
127133
testFixture.EnsureSemaphore();
@@ -133,18 +139,18 @@ public async Task ItsTasksAreConsumedOnlyOnceByMultipleConsumers()
133139
// Higher numbers here increase confidence
134140
var numberOfJobs = 16;
135141
var numberOfConsumers = 4;
136-
142+
137143
var sharedTaskQueueName = nameof(ItsTasksAreConsumedOnlyOnceByMultipleConsumers);
138144
var consumers = Enumerable.Range(0, numberOfConsumers)
139145
.Select(_ => new TaskQueueTestFixture(sharedTaskQueueName)).ToList();
140146

141147
var semaphoreFiles = new List<string>();
142-
for(int i=0;i < numberOfJobs;++i)
148+
for (int i = 0; i < numberOfJobs; ++i)
143149
{
144150
var path = Path.GetTempFileName();
145151
File.Delete(path);
146152
semaphoreFiles.Add(path);
147-
153+
148154
var sharedTaskQueue = consumers[0].TaskQueue;
149155
await sharedTaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(path));
150156
}
@@ -175,25 +181,25 @@ public async Task AsyncFunc(string semaphoreFile)
175181
{
176182
// Wait to ensure async waiting is happening.
177183
await Task.Delay(1000);
178-
184+
179185
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async");
180186
}
181-
187+
182188
public async Task<string> AsyncFuncThatReturnsString(string semaphoreFile)
183189
{
184190
// Wait to ensure async waiting is happening.
185191
await Task.Delay(1000);
186-
192+
187193
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async");
188194

189195
return "async";
190196
}
191-
197+
192198
public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile)
193199
{
194200
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
195201
}
196-
202+
197203
public void DateTimeFunc(DateTime dateTime, string semaphoreFile)
198204
{
199205
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
@@ -203,37 +209,37 @@ public void IntFunc(int num, string semaphoreFile)
203209
{
204210
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
205211
}
206-
212+
207213
public void NullableIntFunc(int? num, string semaphoreFile)
208214
{
209215
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num ?? -1);
210216
}
211-
217+
212218
public void LongFunc(long num, string semaphoreFile)
213219
{
214220
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
215221
}
216-
222+
217223
public void FloatFunc(float num, string semaphoreFile)
218224
{
219225
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
220226
}
221-
227+
222228
public void BoolFunc(bool num, string semaphoreFile)
223229
{
224230
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
225231
}
226-
232+
227233
public void DoubleFunc(double num, string semaphoreFile)
228234
{
229235
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
230236
}
231-
237+
232238
public void StringFunc(string num, string semaphoreFile)
233239
{
234240
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
235241
}
236-
242+
237243
public void ObjectFunc(object num, string semaphoreFile)
238244
{
239245
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
@@ -249,21 +255,26 @@ public void ExceptionFunc(Exception exc, string semaphoreFile)
249255
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, exc);
250256
}
251257

258+
public void CancellationFunc(CancellationToken ct, string semaphoreFile)
259+
{
260+
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, ct);
261+
}
262+
252263
public void TypeFunc(Type typeArg, string semaphoreFile)
253264
{
254265
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, typeArg?.ToString() ?? "null");
255266
}
256-
267+
257268
public void ArrayFunc1(string[] nums, string semaphoreFile)
258269
{
259270
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums));
260271
}
261-
272+
262273
public void ArrayFunc2(int[] nums, string semaphoreFile)
263274
{
264275
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums));
265276
}
266-
277+
267278
public void ArrayFunc3(int?[] nums, string semaphoreFile)
268279
{
269280
var str = "";
@@ -274,7 +285,7 @@ public void ArrayFunc3(int?[] nums, string semaphoreFile)
274285
str += num?.ToString() ?? "null";
275286
first = false;
276287
}
277-
288+
278289
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, str);
279290
}
280291
}

Gofer.NET.Tests/GivenATaskClient.cs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
using System.Linq;
44
using System.Threading;
55
using System.Threading.Tasks;
6+
67
using FluentAssertions;
7-
using Gofer.NET.Utils;
8+
89
using Xunit;
910

1011
namespace Gofer.NET.Tests
@@ -15,11 +16,11 @@ public class GivenATaskClient
1516
public async Task ItContinuesListeningWhenATaskThrowsAnException()
1617
{
1718
var waitTime = 5000;
18-
19+
1920
var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
2021
var taskClient = new TaskClient(taskQueue);
2122
var semaphoreFile = Path.GetTempFileName();
22-
23+
2324
await taskClient.TaskQueue.Enqueue(() => Throw());
2425
await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile));
2526

@@ -28,37 +29,59 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException()
2829

2930
taskClient.CancelListen();
3031
await task;
31-
32+
33+
TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
34+
}
35+
36+
[Fact]
37+
public async Task ItStopsOnCancellation()
38+
{
39+
var semaphoreFile = Path.GetTempFileName();
40+
41+
var waitTime = 2000;
42+
43+
var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
44+
var taskClient = new TaskClient(taskQueue);
45+
var cancellation = new CancellationTokenSource();
46+
47+
await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForCancellationAndWriteSemaphore(semaphoreFile, default));
48+
49+
var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None);
50+
await Task.Delay(waitTime, CancellationToken.None);
51+
cancellation.Cancel();
52+
await Task.Delay(waitTime, CancellationToken.None);
53+
await task;
54+
3255
TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
3356
}
3457

3558
[Fact]
3659
public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks()
3760
{
3861
var waitTime = 4000;
39-
62+
4063
var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
4164
var taskClient = new TaskClient(taskQueue);
4265

4366
var semaphoreFile = Path.GetTempFileName();
4467
File.Delete(semaphoreFile);
4568
File.Exists(semaphoreFile).Should().BeFalse();
46-
69+
4770
await taskClient.TaskQueue.Enqueue(() => Wait(waitTime));
4871

4972
await taskClient.TaskScheduler.AddScheduledTask(
5073
() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile),
5174
TimeSpan.FromMilliseconds(waitTime / 4));
5275

5376
var task = Task.Run(async () => await taskClient.Listen());
54-
77+
5578
await Task.Delay(waitTime / 2);
5679

5780
// Ensure we did not run the scheduled task
5881
File.Exists(semaphoreFile).Should().BeFalse();
5982

6083
var dequeuedScheduledTask = await taskQueue.Dequeue();
61-
84+
6285
File.Exists(semaphoreFile).Should().BeFalse();
6386
dequeuedScheduledTask.Should().NotBeNull();
6487
dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore));
@@ -83,19 +106,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder()
83106
File.Delete(semaphoreFile);
84107
File.Exists(semaphoreFile).Should().BeFalse();
85108

86-
for (var i=0; i<immediateTasks; ++i)
109+
for (var i = 0; i < immediateTasks; ++i)
87110
{
88-
await taskClient.TaskQueue.Enqueue(() =>
89-
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString()));
111+
await taskClient.TaskQueue.Enqueue(() =>
112+
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString()));
90113
}
91114

92-
for (var i=0; i<scheduledTasks; ++i)
115+
for (var i = 0; i < scheduledTasks; ++i)
93116
{
94117
await taskClient.TaskScheduler.AddScheduledTask(
95-
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()),
96-
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i)));
118+
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()),
119+
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i)));
97120
}
98-
121+
99122
var task = Task.Run(async () => await taskClient.Listen());
100123
Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000);
101124

0 commit comments

Comments
 (0)