diff --git a/Gofer.NET.Tests/GivenARedisTaskQueue.cs b/Gofer.NET.Tests/GivenARedisTaskQueue.cs index 29597c6..8b36732 100644 --- a/Gofer.NET.Tests/GivenARedisTaskQueue.cs +++ b/Gofer.NET.Tests/GivenARedisTaskQueue.cs @@ -3,7 +3,6 @@ using System.IO; using System.Linq; using System.Linq.Expressions; -using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Gofer.NET.Utils; @@ -35,14 +34,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() var semaphoreFile = Path.GetTempFileName(); var now = DateTime.Now; var utcNow = DateTime.UtcNow; + var nowOffset = DateTimeOffset.Now; + var utcNowOffset = DateTimeOffset.UtcNow; Func, string, Tuple, string>> TC = (actionExp, str) => Tuple.Create, string>( actionExp, str); - + // Action to expected result - var delgates = new Tuple, string>[] + var delegates = new Tuple, string>[] { // Exception Argument TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()), @@ -73,6 +74,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => StringFunc("astring", semaphoreFile), "astring"), TC(() => StringFunc(variableToExtract, semaphoreFile), variableToExtract), + TC(() => TimeSpanFunc(TimeSpan.FromMinutes(1), semaphoreFile), TimeSpan.FromMinutes(1).ToString()), + // Object Arguments + Overloaded Version TC(() => ObjectFunc(new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"), TC(() => ObjectFunc(null, new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"), @@ -80,8 +83,12 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => DateTimeFunc(now, semaphoreFile), now.ToString()), TC(() => DateTimeFunc(utcNow, semaphoreFile), utcNow.ToString()), + TC(() => DateTimeOffsetFunc(nowOffset, semaphoreFile), nowOffset.ToString()), + TC(() => DateTimeOffsetFunc(utcNowOffset, semaphoreFile), utcNowOffset.ToString()), + TC(() => NullableTypeFunc(null, semaphoreFile), "null"), TC(() => NullableTypeFunc(now, semaphoreFile), now.ToString()), + TC(() => NullableTypeFunc2(nowOffset, semaphoreFile), nowOffset.ToString()), TC(() => ArrayFunc1(new[] {"this", "string", "is"}, semaphoreFile), "this,string,is"), TC(() => ArrayFunc2(new[] {1, 2, 3, 4}, semaphoreFile), "1,2,3,4"), TC(() => ArrayFunc3(new int?[] {1, 2, 3, null, 5}, semaphoreFile), "1,2,3,null,5"), @@ -99,9 +106,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => AsyncFunc(semaphoreFile).T(), "async"), TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async") }; - - foreach (var tup in delgates) + foreach (var tup in delegates) { var actionExpr = tup.Item1; var expectedString = tup.Item2; @@ -193,12 +199,22 @@ public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } - + + public void NullableTypeFunc2(DateTimeOffset? dateTime, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); + } + public void DateTimeFunc(DateTime dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } + public void DateTimeOffsetFunc(DateTimeOffset dateTime, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); + } + public void IntFunc(int num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); @@ -233,7 +249,12 @@ public void StringFunc(string num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + + public void TimeSpanFunc(TimeSpan num, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); + } + public void ObjectFunc(object num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); diff --git a/Gofer.NET.Tests/GivenATaskClient.cs b/Gofer.NET.Tests/GivenATaskClient.cs index bd8656f..9f9ef15 100644 --- a/Gofer.NET.Tests/GivenATaskClient.cs +++ b/Gofer.NET.Tests/GivenATaskClient.cs @@ -3,8 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; + using FluentAssertions; -using Gofer.NET.Utils; + using Xunit; namespace Gofer.NET.Tests @@ -15,11 +16,11 @@ public class GivenATaskClient public async Task ItContinuesListeningWhenATaskThrowsAnException() { var waitTime = 5000; - + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); var taskClient = new TaskClient(taskQueue); var semaphoreFile = Path.GetTempFileName(); - + await taskClient.TaskQueue.Enqueue(() => Throw()); await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile)); @@ -28,7 +29,33 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException() taskClient.CancelListen(); await task; - + + TaskQueueTestFixture.EnsureSemaphore(semaphoreFile); + } + + [Fact] + public async Task ItStopsOnCancellation() + { + var semaphoreFile = Path.GetTempFileName(); + var timeout = TimeSpan.FromMinutes(1); + + var waitTime = TimeSpan.FromSeconds(2); + + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); + var taskClient = new TaskClient(taskQueue); + var cancellation = new CancellationTokenSource(); + + await taskClient.TaskQueue.Enqueue(() => + TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore( + semaphoreFile, + timeout)); + + var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None); + await Task.Delay(waitTime, CancellationToken.None); + cancellation.Cancel(); + await Task.Delay(waitTime, CancellationToken.None); + await task; + TaskQueueTestFixture.EnsureSemaphore(semaphoreFile); } @@ -36,14 +63,14 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException() public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks() { var waitTime = 4000; - + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); var taskClient = new TaskClient(taskQueue); var semaphoreFile = Path.GetTempFileName(); File.Delete(semaphoreFile); File.Exists(semaphoreFile).Should().BeFalse(); - + await taskClient.TaskQueue.Enqueue(() => Wait(waitTime)); await taskClient.TaskScheduler.AddScheduledTask( @@ -51,14 +78,14 @@ await taskClient.TaskScheduler.AddScheduledTask( TimeSpan.FromMilliseconds(waitTime / 4)); var task = Task.Run(async () => await taskClient.Listen()); - + await Task.Delay(waitTime / 2); // Ensure we did not run the scheduled task File.Exists(semaphoreFile).Should().BeFalse(); var dequeuedScheduledTask = await taskQueue.Dequeue(); - + File.Exists(semaphoreFile).Should().BeFalse(); dequeuedScheduledTask.Should().NotBeNull(); dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore)); @@ -83,19 +110,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder() File.Delete(semaphoreFile); File.Exists(semaphoreFile).Should().BeFalse(); - for (var i=0; i - TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString())); + await taskClient.TaskQueue.Enqueue(() => + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString())); } - for (var i=0; i TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()), - TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i))); + () => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()), + TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i))); } - + var task = Task.Run(async () => await taskClient.Listen()); Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000); diff --git a/Gofer.NET.Tests/TestQueueTestFixture.cs b/Gofer.NET.Tests/TestQueueTestFixture.cs index 74bbb48..a79b2b2 100644 --- a/Gofer.NET.Tests/TestQueueTestFixture.cs +++ b/Gofer.NET.Tests/TestQueueTestFixture.cs @@ -2,6 +2,7 @@ using System.IO; using System.Threading; using System.Threading.Tasks; + using FluentAssertions; namespace Gofer.NET.Tests @@ -9,30 +10,30 @@ namespace Gofer.NET.Tests public class TaskQueueTestFixture { private static readonly ReaderWriterLock Locker = new ReaderWriterLock(); - + public static string SemaphoreText => "completed"; - + public TaskQueue TaskQueue { get; } - + public static string RedisConnectionString => "localhost:6379"; private readonly string _semaphoreFile; - public static TaskQueue UniqueRedisTaskQueue(string prefix=null) + public static TaskQueue UniqueRedisTaskQueue(string prefix = null) { var taskQueueName = $"{prefix ?? nameof(TaskQueueTestFixture)}::{Guid.NewGuid().ToString()}"; return TaskQueue.Redis(RedisConnectionString, taskQueueName); } - public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue=null) + public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null) { _semaphoreFile = Path.Combine(AppContext.BaseDirectory, uniqueId, Path.GetTempFileName()); - + var testQueueName = uniqueId + "::TestQueue"; TaskQueue = taskQueue ?? TaskQueueTestFixture.UniqueRedisTaskQueue(uniqueId); - + // Clear out the queue - while(TaskQueue.Dequeue().Result != null) { } + while (TaskQueue.Dequeue().Result != null) { } } public async Task PushPopExecuteWriteSemaphore() @@ -52,12 +53,12 @@ public void EnsureSemaphore() { EnsureSemaphore(_semaphoreFile); } - + public static void EnsureSemaphore(string semaphoreFile) { try { - Locker.AcquireReaderLock(30000); + Locker.AcquireReaderLock(30000); File.ReadAllText(semaphoreFile).Should().Be(SemaphoreText); } finally @@ -70,12 +71,27 @@ public static void WriteSemaphore(string semaphoreFile) { WriteSemaphoreValue(semaphoreFile, SemaphoreText); } - + + public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile, TimeSpan timeout) + { + var token = TaskClient.GetListenCancellation(); + if (!token.CanBeCanceled) + throw new InvalidOperationException("This method must be called from a task client callback"); + try + { + await Task.Delay(timeout, token); + } + catch (OperationCanceledException) + { + } + WriteSemaphore(semaphoreFile); + } + public static void WriteSemaphoreValue(string semaphoreFile, object value) { try { - Locker.AcquireWriterLock(30000); + Locker.AcquireWriterLock(30000); File.AppendAllText(semaphoreFile, value?.ToString() ?? "null"); } finally diff --git a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs index 9f88ea6..dfc55c9 100644 --- a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs +++ b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs @@ -41,7 +41,8 @@ public static T Deserialize(string jsonString) where T : class var settings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All, - TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full + TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full, + DateParseHandling = DateParseHandling.DateTimeOffset }; settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter()); diff --git a/Gofer.NET.Utils/TaskInfo.cs b/Gofer.NET.Utils/TaskInfo.cs index 3ef6365..01a3a89 100644 --- a/Gofer.NET.Utils/TaskInfo.cs +++ b/Gofer.NET.Utils/TaskInfo.cs @@ -55,14 +55,15 @@ public bool IsEquivalent(TaskInfo otherTaskInfo) && ReturnType.Equals(otherTaskInfo.ReturnType); } - public void ConvertTypeArgs() + public void ConvertTypeArgs() { for (var i=0;i _listenCancellationContext = new AsyncLocal(); + private static readonly object Locker = new object(); - + private const int PollDelay = 100; - private bool IsCanceled { get; set; } - + public static CancellationToken GetListenCancellation() => _listenCancellationContext.Value; + public TaskQueue TaskQueue { get; } public Action OnError { get; } @@ -29,35 +30,46 @@ public class TaskClient private CancellationTokenSource ListenCancellationTokenSource { get; set; } public TaskClient( - TaskQueue taskQueue, - Action onError=null) + TaskQueue taskQueue, + Action onError = null) { TaskQueue = taskQueue; OnError = onError; TaskScheduler = new TaskScheduler(TaskQueue); - IsCanceled = false; } - public async Task Listen() + public Task Listen() + { + return Listen(CancellationToken.None); + } + + public async Task Listen(CancellationToken cancellation) { - Start(); + Start(cancellation); - await Task.WhenAll(new [] { - TaskRunnerThread, + await Task.WhenAll(new[] { + TaskRunnerThread, TaskSchedulerThread}); } public CancellationTokenSource Start() + { + return Start(CancellationToken.None); + } + + public CancellationTokenSource Start(CancellationToken cancellation) { if (TaskSchedulerThread != null || TaskRunnerThread != null) { throw new Exception("This TaskClient is already listening."); } - ListenCancellationTokenSource = new CancellationTokenSource(); + + ListenCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation); var token = ListenCancellationTokenSource.Token; - TaskSchedulerThread = Task.Run(async () => { + TaskSchedulerThread = Task.Run(async () => + { var inThreadTaskScheduler = new TaskScheduler(TaskQueue); while (true) @@ -69,43 +81,49 @@ public CancellationTokenSource Start() await inThreadTaskScheduler.Tick(); } - }, ListenCancellationTokenSource.Token); + }, token); - TaskRunnerThread = Task.Run(async () => { + TaskRunnerThread = Task.Run(async () => + { while (true) { if (token.IsCancellationRequested) { return; } - - await ExecuteQueuedTask(); + + await ExecuteQueuedTask(token); } - }, ListenCancellationTokenSource.Token); + }, token); return ListenCancellationTokenSource; } - private async Task ExecuteQueuedTask() + private async Task ExecuteQueuedTask(CancellationToken token) { var (json, info) = await TaskQueue.SafeDequeue(); if (info != null) { LogTaskStarted(info); - + var old = _listenCancellationContext.Value; try { - var now = DateTime.Now; - + _listenCancellationContext.Value = token; + + var executionTimer = Stopwatch.StartNew(); await info.ExecuteTask(); - - var completionSeconds = (DateTime.Now - now).TotalSeconds; + executionTimer.Stop(); + var completionSeconds = executionTimer.Elapsed.TotalSeconds; LogTaskFinished(info, completionSeconds); } catch (Exception e) { LogTaskException(info, e); } + finally + { + _listenCancellationContext.Value = old; + } } } @@ -122,7 +140,7 @@ private void LogTaskStarted(TaskInfo info) var logMessage = Messages.TaskStarted(info); ThreadSafeColoredConsole.Info(logMessage); } - + private void LogTaskFinished(TaskInfo info, double completionSeconds) { var logMessage = Messages.TaskFinished(info, completionSeconds); diff --git a/TaskQueue.cs b/TaskQueue.cs index db98de4..d9a7f8c 100644 --- a/TaskQueue.cs +++ b/TaskQueue.cs @@ -35,7 +35,13 @@ public async Task Enqueue(Expression expression) var taskInfo = expression.ToTaskInfo(); await Enqueue(taskInfo); } - + + public async Task Enqueue(Expression> expression) + { + var taskInfo = expression.ToTaskInfo(); + await Enqueue(taskInfo); + } + internal async Task Enqueue(TaskInfo taskInfo) { taskInfo.ConvertTypeArgs();