Skip to content

Add cancellation token support #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions Gofer.NET.Tests/GivenARedisTaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Gofer.NET.Utils;
Expand Down Expand Up @@ -35,14 +34,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
var semaphoreFile = Path.GetTempFileName();
var now = DateTime.Now;
var utcNow = DateTime.UtcNow;
var nowOffset = DateTimeOffset.Now;
var utcNowOffset = DateTimeOffset.UtcNow;

Func<Expression<Action>, string, Tuple<Expression<Action>, string>> TC = (actionExp, str) =>
Tuple.Create<Expression<Action>, string>(
actionExp,
str);

// Action to expected result
var delgates = new Tuple<Expression<Action>, string>[]
var delegates = new Tuple<Expression<Action>, string>[]
{
// Exception Argument
TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()),
Expand Down Expand Up @@ -73,15 +74,21 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
TC(() => StringFunc("astring", semaphoreFile), "astring"),
TC(() => StringFunc(variableToExtract, semaphoreFile), variableToExtract),

TC(() => TimeSpanFunc(TimeSpan.FromMinutes(1), semaphoreFile), TimeSpan.FromMinutes(1).ToString()),

// Object Arguments + Overloaded Version
TC(() => ObjectFunc(new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"),
TC(() => ObjectFunc(null, new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"),

TC(() => DateTimeFunc(now, semaphoreFile), now.ToString()),
TC(() => DateTimeFunc(utcNow, semaphoreFile), utcNow.ToString()),

TC(() => DateTimeOffsetFunc(nowOffset, semaphoreFile), nowOffset.ToString()),
TC(() => DateTimeOffsetFunc(utcNowOffset, semaphoreFile), utcNowOffset.ToString()),

TC(() => NullableTypeFunc(null, semaphoreFile), "null"),
TC(() => NullableTypeFunc(now, semaphoreFile), now.ToString()),
TC(() => NullableTypeFunc2(nowOffset, semaphoreFile), nowOffset.ToString()),
TC(() => ArrayFunc1(new[] {"this", "string", "is"}, semaphoreFile), "this,string,is"),
TC(() => ArrayFunc2(new[] {1, 2, 3, 4}, semaphoreFile), "1,2,3,4"),
TC(() => ArrayFunc3(new int?[] {1, 2, 3, null, 5}, semaphoreFile), "1,2,3,null,5"),
Expand All @@ -99,9 +106,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
TC(() => AsyncFunc(semaphoreFile).T(), "async"),
TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async")
};


foreach (var tup in delgates)
foreach (var tup in delegates)
{
var actionExpr = tup.Item1;
var expectedString = tup.Item2;
Expand Down Expand Up @@ -193,12 +199,22 @@ public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
}


public void NullableTypeFunc2(DateTimeOffset? dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
}

public void DateTimeFunc(DateTime dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
}

public void DateTimeOffsetFunc(DateTimeOffset dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
}

public void IntFunc(int num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
Expand Down Expand Up @@ -233,7 +249,12 @@ public void StringFunc(string num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}


public void TimeSpanFunc(TimeSpan num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void ObjectFunc(object num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
Expand Down
57 changes: 42 additions & 15 deletions Gofer.NET.Tests/GivenATaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;
using Gofer.NET.Utils;

using Xunit;

namespace Gofer.NET.Tests
Expand All @@ -15,11 +16,11 @@ public class GivenATaskClient
public async Task ItContinuesListeningWhenATaskThrowsAnException()
{
var waitTime = 5000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var semaphoreFile = Path.GetTempFileName();

await taskClient.TaskQueue.Enqueue(() => Throw());
await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile));

Expand All @@ -28,37 +29,63 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException()

taskClient.CancelListen();
await task;


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a bug in the test, good catch!

}

[Fact]
public async Task ItStopsOnCancellation()
{
var semaphoreFile = Path.GetTempFileName();
var timeout = TimeSpan.FromMinutes(1);

var waitTime = TimeSpan.FromSeconds(2);

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var cancellation = new CancellationTokenSource();

await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore(
semaphoreFile,
timeout));

var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None);
await Task.Delay(waitTime, CancellationToken.None);
cancellation.Cancel();
Copy link
Author

@ig-sinicyn ig-sinicyn Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enqueued task (WaitForTaskClientCancellationAndWriteSemaphore) will be canceled at this moment.

await Task.Delay(waitTime, CancellationToken.None);
await task;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a bug is introduced, and the task fails to exit on cancellation will this introduce a hanging test?


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
}

[Fact]
public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks()
{
var waitTime = 4000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);

var semaphoreFile = Path.GetTempFileName();
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

await taskClient.TaskQueue.Enqueue(() => Wait(waitTime));

await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile),
TimeSpan.FromMilliseconds(waitTime / 4));

var task = Task.Run(async () => await taskClient.Listen());

await Task.Delay(waitTime / 2);

// Ensure we did not run the scheduled task
File.Exists(semaphoreFile).Should().BeFalse();

var dequeuedScheduledTask = await taskQueue.Dequeue();

File.Exists(semaphoreFile).Should().BeFalse();
dequeuedScheduledTask.Should().NotBeNull();
dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore));
Expand All @@ -83,19 +110,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder()
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

for (var i=0; i<immediateTasks; ++i)
for (var i = 0; i < immediateTasks; ++i)
{
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString()));
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString()));
}

for (var i=0; i<scheduledTasks; ++i)
for (var i = 0; i < scheduledTasks; ++i)
{
await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i)));
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i)));
}

var task = Task.Run(async () => await taskClient.Listen());
Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000);

Expand Down
40 changes: 28 additions & 12 deletions Gofer.NET.Tests/TestQueueTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,38 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;

namespace Gofer.NET.Tests
{
public class TaskQueueTestFixture
{
private static readonly ReaderWriterLock Locker = new ReaderWriterLock();

public static string SemaphoreText => "completed";

public TaskQueue TaskQueue { get; }

public static string RedisConnectionString => "localhost:6379";

private readonly string _semaphoreFile;

public static TaskQueue UniqueRedisTaskQueue(string prefix=null)
public static TaskQueue UniqueRedisTaskQueue(string prefix = null)
{
var taskQueueName = $"{prefix ?? nameof(TaskQueueTestFixture)}::{Guid.NewGuid().ToString()}";
return TaskQueue.Redis(RedisConnectionString, taskQueueName);
}

public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue=null)
public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null)
{
_semaphoreFile = Path.Combine(AppContext.BaseDirectory, uniqueId, Path.GetTempFileName());

var testQueueName = uniqueId + "::TestQueue";
TaskQueue = taskQueue ?? TaskQueueTestFixture.UniqueRedisTaskQueue(uniqueId);

// Clear out the queue
while(TaskQueue.Dequeue().Result != null) { }
while (TaskQueue.Dequeue().Result != null) { }
}

public async Task PushPopExecuteWriteSemaphore()
Expand All @@ -52,12 +53,12 @@ public void EnsureSemaphore()
{
EnsureSemaphore(_semaphoreFile);
}

public static void EnsureSemaphore(string semaphoreFile)
{
try
{
Locker.AcquireReaderLock(30000);
Locker.AcquireReaderLock(30000);
File.ReadAllText(semaphoreFile).Should().Be(SemaphoreText);
}
finally
Expand All @@ -70,12 +71,27 @@ public static void WriteSemaphore(string semaphoreFile)
{
WriteSemaphoreValue(semaphoreFile, SemaphoreText);
}


public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile, TimeSpan timeout)
{
var token = TaskClient.GetListenCancellation();
if (!token.CanBeCanceled)
throw new InvalidOperationException("This method must be called from a task client callback");
try
{
await Task.Delay(timeout, token);
}
catch (OperationCanceledException)
{
}
WriteSemaphore(semaphoreFile);
}

public static void WriteSemaphoreValue(string semaphoreFile, object value)
{
try
{
Locker.AcquireWriterLock(30000);
Locker.AcquireWriterLock(30000);
File.AppendAllText(semaphoreFile, value?.ToString() ?? "null");
}
finally
Expand Down
3 changes: 2 additions & 1 deletion Gofer.NET.Utils/JsonTaskInfoSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static T Deserialize<T>(string jsonString) where T : class
var settings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All,
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full,
DateParseHandling = DateParseHandling.DateTimeOffset
};
settings.Converters.Insert(0, new JsonPrimitiveConverter());
settings.Converters.Insert(1, new ExceptionConverter());
Expand Down
24 changes: 18 additions & 6 deletions Gofer.NET.Utils/TaskInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ public bool IsEquivalent(TaskInfo otherTaskInfo)
&& ReturnType.Equals(otherTaskInfo.ReturnType);
}

public void ConvertTypeArgs()
public void ConvertTypeArgs()
{
for (var i=0;i<Args.Length;++i) {
if (Args[i] == null)
continue;

if (typeof(Type).IsAssignableFrom(Args[i].GetType())) {
Args[i] = new TypeWrapper {Type=(Type)Args[i]};
if (typeof(Type).IsInstanceOfType(Args[i]))
{
Args[i] = new TypeWrapper { Type = (Type)Args[i] };
}
}
}
Expand All @@ -72,9 +73,20 @@ public void UnconvertTypeArgs()
for (var i=0;i<Args.Length;++i) {
if (Args[i] == null)
continue;

if (typeof(TypeWrapper).IsAssignableFrom(Args[i].GetType())) {
Args[i] = ((TypeWrapper) Args[i]).Type;

var argType = Nullable.GetUnderlyingType(ArgTypes[i]) ?? ArgTypes[i];

if (typeof(TypeWrapper).IsInstanceOfType(Args[i]))
{
Args[i] = ((TypeWrapper)Args[i]).Type;
}
else if (typeof(TimeSpan).IsAssignableFrom(argType))
{
Args[i] = TimeSpan.Parse((string)Args[i]);
}
else if (typeof(DateTime).IsAssignableFrom(argType) && Args[i] is DateTimeOffset dateTimeOffset)
{
Args[i] = dateTimeOffset.DateTime;
}
}
}
Expand Down
Loading