From 5eda310caa6cf4dbbcca20ff4278057e2318e28a Mon Sep 17 00:00:00 2001 From: ig-sinicyn Date: Sun, 19 Jul 2020 13:40:34 +0300 Subject: [PATCH 1/6] Add cancellation token support --- Gofer.NET.Tests/GivenARedisTaskQueue.cs | 69 +++++++++++-------- Gofer.NET.Tests/GivenATaskClient.cs | 53 ++++++++++---- Gofer.NET.Tests/GivenATaskInfo.cs | 35 +++++++--- Gofer.NET.Tests/TestQueueTestFixture.cs | 41 +++++++---- Gofer.NET.Utils/ActionExtensionMethods.cs | 27 ++++---- .../JsonSkipCancellationTokenConverter.cs | 27 ++++++++ Gofer.NET.Utils/JsonTaskInfoSerializer.cs | 15 ++-- Gofer.NET.Utils/TaskInfo.cs | 29 ++++++-- TaskClient.cs | 53 ++++++++------ TaskQueue.cs | 14 ++-- 10 files changed, 244 insertions(+), 119 deletions(-) create mode 100644 Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs diff --git a/Gofer.NET.Tests/GivenARedisTaskQueue.cs b/Gofer.NET.Tests/GivenARedisTaskQueue.cs index 29597c6..f04a149 100644 --- a/Gofer.NET.Tests/GivenARedisTaskQueue.cs +++ b/Gofer.NET.Tests/GivenARedisTaskQueue.cs @@ -5,8 +5,11 @@ using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; + using FluentAssertions; + using Gofer.NET.Utils; + using Xunit; namespace Gofer.NET.Tests @@ -22,8 +25,8 @@ public override string ToString() return Value; } } - - private class CustomException : Exception {} + + private class CustomException : Exception { } [Fact] public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() @@ -40,13 +43,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() Tuple.Create, string>( actionExp, str); - + // Action to expected result var delgates = new Tuple, string>[] { // Exception Argument TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()), TC(() => ExceptionFunc(new CustomException(), semaphoreFile), new CustomException().ToString()), + + // Cancelation Argument + TC(() => CancellationFunc(default, semaphoreFile), new CancellationToken().ToString()), // Integer Arguments TC(() => IntFunc(int.MaxValue, semaphoreFile), int.MaxValue.ToString()), @@ -99,29 +105,29 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => AsyncFunc(semaphoreFile).T(), "async"), TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async") }; - + foreach (var tup in delgates) { var actionExpr = tup.Item1; var expectedString = tup.Item2; - + File.Delete(semaphoreFile); - - await testFixture.TaskQueue.Enqueue(actionExpr); + + await testFixture.TaskQueue.Enqueue(actionExpr); await testFixture.TaskQueue.ExecuteNext(); File.ReadAllText(semaphoreFile).Should().Be(expectedString); } - + File.Delete(semaphoreFile); } - + [Fact] public async Task ItEnqueuesAndReceivesDelegatesThatAreRunnable() { var testFixture = new TaskQueueTestFixture(nameof(ItEnqueuesAndReceivesDelegatesThatAreRunnable)); - + testFixture.EnsureSemaphoreDoesntExist(); await testFixture.PushPopExecuteWriteSemaphore(); testFixture.EnsureSemaphore(); @@ -133,18 +139,18 @@ public async Task ItsTasksAreConsumedOnlyOnceByMultipleConsumers() // Higher numbers here increase confidence var numberOfJobs = 16; var numberOfConsumers = 4; - + var sharedTaskQueueName = nameof(ItsTasksAreConsumedOnlyOnceByMultipleConsumers); var consumers = Enumerable.Range(0, numberOfConsumers) .Select(_ => new TaskQueueTestFixture(sharedTaskQueueName)).ToList(); var semaphoreFiles = new List(); - for(int i=0;i < numberOfJobs;++i) + for (int i = 0; i < numberOfJobs; ++i) { var path = Path.GetTempFileName(); File.Delete(path); semaphoreFiles.Add(path); - + var sharedTaskQueue = consumers[0].TaskQueue; await sharedTaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(path)); } @@ -175,25 +181,25 @@ public async Task AsyncFunc(string semaphoreFile) { // Wait to ensure async waiting is happening. await Task.Delay(1000); - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async"); } - + public async Task AsyncFuncThatReturnsString(string semaphoreFile) { // Wait to ensure async waiting is happening. await Task.Delay(1000); - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async"); return "async"; } - + public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } - + public void DateTimeFunc(DateTime dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); @@ -203,37 +209,37 @@ public void IntFunc(int num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void NullableIntFunc(int? num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num ?? -1); } - + public void LongFunc(long num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void FloatFunc(float num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void BoolFunc(bool num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void DoubleFunc(double num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void StringFunc(string num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void ObjectFunc(object num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); @@ -249,21 +255,26 @@ public void ExceptionFunc(Exception exc, string semaphoreFile) TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, exc); } + public void CancellationFunc(CancellationToken ct, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, ct); + } + public void TypeFunc(Type typeArg, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, typeArg?.ToString() ?? "null"); } - + public void ArrayFunc1(string[] nums, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums)); } - + public void ArrayFunc2(int[] nums, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums)); } - + public void ArrayFunc3(int?[] nums, string semaphoreFile) { var str = ""; @@ -274,7 +285,7 @@ public void ArrayFunc3(int?[] nums, string semaphoreFile) str += num?.ToString() ?? "null"; first = false; } - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, str); } } diff --git a/Gofer.NET.Tests/GivenATaskClient.cs b/Gofer.NET.Tests/GivenATaskClient.cs index bd8656f..c774d31 100644 --- a/Gofer.NET.Tests/GivenATaskClient.cs +++ b/Gofer.NET.Tests/GivenATaskClient.cs @@ -3,8 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; + using FluentAssertions; -using Gofer.NET.Utils; + using Xunit; namespace Gofer.NET.Tests @@ -15,11 +16,11 @@ public class GivenATaskClient public async Task ItContinuesListeningWhenATaskThrowsAnException() { var waitTime = 5000; - + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); var taskClient = new TaskClient(taskQueue); var semaphoreFile = Path.GetTempFileName(); - + await taskClient.TaskQueue.Enqueue(() => Throw()); await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile)); @@ -28,7 +29,29 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException() taskClient.CancelListen(); await task; - + + TaskQueueTestFixture.EnsureSemaphore(semaphoreFile); + } + + [Fact] + public async Task ItStopsOnCancellation() + { + var semaphoreFile = Path.GetTempFileName(); + + var waitTime = 2000; + + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); + var taskClient = new TaskClient(taskQueue); + var cancellation = new CancellationTokenSource(); + + await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForCancellationAndWriteSemaphore(semaphoreFile, default)); + + var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None); + await Task.Delay(waitTime, CancellationToken.None); + cancellation.Cancel(); + await Task.Delay(waitTime, CancellationToken.None); + await task; + TaskQueueTestFixture.EnsureSemaphore(semaphoreFile); } @@ -36,14 +59,14 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException() public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks() { var waitTime = 4000; - + var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); var taskClient = new TaskClient(taskQueue); var semaphoreFile = Path.GetTempFileName(); File.Delete(semaphoreFile); File.Exists(semaphoreFile).Should().BeFalse(); - + await taskClient.TaskQueue.Enqueue(() => Wait(waitTime)); await taskClient.TaskScheduler.AddScheduledTask( @@ -51,14 +74,14 @@ await taskClient.TaskScheduler.AddScheduledTask( TimeSpan.FromMilliseconds(waitTime / 4)); var task = Task.Run(async () => await taskClient.Listen()); - + await Task.Delay(waitTime / 2); // Ensure we did not run the scheduled task File.Exists(semaphoreFile).Should().BeFalse(); var dequeuedScheduledTask = await taskQueue.Dequeue(); - + File.Exists(semaphoreFile).Should().BeFalse(); dequeuedScheduledTask.Should().NotBeNull(); dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore)); @@ -83,19 +106,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder() File.Delete(semaphoreFile); File.Exists(semaphoreFile).Should().BeFalse(); - for (var i=0; i - TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString())); + await taskClient.TaskQueue.Enqueue(() => + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString())); } - for (var i=0; i TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()), - TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i))); + () => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()), + TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i))); } - + var task = Task.Run(async () => await taskClient.Listen()); Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000); diff --git a/Gofer.NET.Tests/GivenATaskInfo.cs b/Gofer.NET.Tests/GivenATaskInfo.cs index 827ccfd..0dd4096 100644 --- a/Gofer.NET.Tests/GivenATaskInfo.cs +++ b/Gofer.NET.Tests/GivenATaskInfo.cs @@ -1,12 +1,13 @@ using System; -using System.IO; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; + +using FluentAssertions; + using Gofer.NET.Utils; + using Xunit; -using FluentAssertions; -using System.Reflection; namespace Gofer.NET.Tests { @@ -14,13 +15,15 @@ public class GivenATaskInfo { [Fact] public void ItPersistsPropertiesWhenSerializedAndDeserialized() - { + { var taskInfos = new[] { GetTestTask(() => TestMethod1("hello world")), - GetTestTask(() => TestMethod2()) + GetTestTask(() => TestMethod2()), + GetTestTask(() => TestMethod3(default)) }; - foreach (var taskInfo in taskInfos) { + foreach (var taskInfo in taskInfos) + { var serializedTaskInfo = JsonTaskInfoSerializer.Serialize(taskInfo); var deserializedTaskInfo = JsonTaskInfoSerializer.Deserialize(serializedTaskInfo); @@ -47,8 +50,11 @@ public void ItProperlyDeterminesEquivalence() var taskInfo2a = GetTestTask(() => TestMethod2()); var taskInfo2b = GetTestTask(() => TestMethod2()); - var taskInfo3a = GetTestTask(() => Console.WriteLine("hello")); - var taskInfo3b = GetTestTask(() => Console.WriteLine("hello world")); + var taskInfo3a = GetTestTask(() => TestMethod3(default)); + var taskInfo3b = GetTestTask(() => TestMethod3(new CancellationTokenSource().Token)); + + var taskInfo4a = GetTestTask(() => Console.WriteLine("hello")); + var taskInfo4b = GetTestTask(() => Console.WriteLine("hello world")); taskInfo1a.IsEquivalent(taskInfo1a).Should().BeTrue(); taskInfo1a.IsEquivalent(taskInfo1b).Should().BeTrue(); @@ -63,10 +69,13 @@ public void ItProperlyDeterminesEquivalence() taskInfo2a.IsEquivalent(taskInfo3a).Should().BeFalse(); taskInfo3a.IsEquivalent(taskInfo3a).Should().BeTrue(); - taskInfo3a.IsEquivalent(taskInfo3b).Should().BeFalse(); + taskInfo3a.IsEquivalent(taskInfo3b).Should().BeTrue(); + + taskInfo4a.IsEquivalent(taskInfo4a).Should().BeTrue(); + taskInfo4a.IsEquivalent(taskInfo4b).Should().BeFalse(); } - private TaskInfo GetTestTask(Expression action) + private TaskInfo GetTestTask(Expression action) { return action.ToTaskInfo(); } @@ -80,5 +89,11 @@ private void TestMethod2() { Console.WriteLine(nameof(TestMethod2)); } + + private Task TestMethod3(CancellationToken cancellation = default) + { + Console.WriteLine(nameof(TestMethod3)); + return Task.FromResult(nameof(TestMethod3)); + } } } \ No newline at end of file diff --git a/Gofer.NET.Tests/TestQueueTestFixture.cs b/Gofer.NET.Tests/TestQueueTestFixture.cs index 74bbb48..ae31d16 100644 --- a/Gofer.NET.Tests/TestQueueTestFixture.cs +++ b/Gofer.NET.Tests/TestQueueTestFixture.cs @@ -2,6 +2,7 @@ using System.IO; using System.Threading; using System.Threading.Tasks; + using FluentAssertions; namespace Gofer.NET.Tests @@ -9,37 +10,37 @@ namespace Gofer.NET.Tests public class TaskQueueTestFixture { private static readonly ReaderWriterLock Locker = new ReaderWriterLock(); - + public static string SemaphoreText => "completed"; - + public TaskQueue TaskQueue { get; } - + public static string RedisConnectionString => "localhost:6379"; private readonly string _semaphoreFile; - public static TaskQueue UniqueRedisTaskQueue(string prefix=null) + public static TaskQueue UniqueRedisTaskQueue(string prefix = null) { var taskQueueName = $"{prefix ?? nameof(TaskQueueTestFixture)}::{Guid.NewGuid().ToString()}"; return TaskQueue.Redis(RedisConnectionString, taskQueueName); } - public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue=null) + public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null) { _semaphoreFile = Path.Combine(AppContext.BaseDirectory, uniqueId, Path.GetTempFileName()); - + var testQueueName = uniqueId + "::TestQueue"; TaskQueue = taskQueue ?? TaskQueueTestFixture.UniqueRedisTaskQueue(uniqueId); - + // Clear out the queue - while(TaskQueue.Dequeue().Result != null) { } + while (TaskQueue.Dequeue().Result != null) { } } - public async Task PushPopExecuteWriteSemaphore() + public async Task PushPopExecuteWriteSemaphore(CancellationToken cancellation = default) { await TaskQueue.Enqueue(() => WriteSemaphore(_semaphoreFile)); var dequeuedTaskInfo = await TaskQueue.Dequeue(); - await dequeuedTaskInfo.ExecuteTask(); + await dequeuedTaskInfo.ExecuteTask(cancellation); } public void EnsureSemaphoreDoesntExist() @@ -52,12 +53,12 @@ public void EnsureSemaphore() { EnsureSemaphore(_semaphoreFile); } - + public static void EnsureSemaphore(string semaphoreFile) { try { - Locker.AcquireReaderLock(30000); + Locker.AcquireReaderLock(30000); File.ReadAllText(semaphoreFile).Should().Be(SemaphoreText); } finally @@ -70,12 +71,24 @@ public static void WriteSemaphore(string semaphoreFile) { WriteSemaphoreValue(semaphoreFile, SemaphoreText); } - + + public static async Task WaitForCancellationAndWriteSemaphore(string semaphoreFile, CancellationToken token = default) + { + try + { + await Task.Delay(-1, token); + } + catch (OperationCanceledException) + { + } + WriteSemaphore(semaphoreFile); + } + public static void WriteSemaphoreValue(string semaphoreFile, object value) { try { - Locker.AcquireWriterLock(30000); + Locker.AcquireWriterLock(30000); File.AppendAllText(semaphoreFile, value?.ToString() ?? "null"); } finally diff --git a/Gofer.NET.Utils/ActionExtensionMethods.cs b/Gofer.NET.Utils/ActionExtensionMethods.cs index de3ca47..431d7fd 100644 --- a/Gofer.NET.Utils/ActionExtensionMethods.cs +++ b/Gofer.NET.Utils/ActionExtensionMethods.cs @@ -2,6 +2,7 @@ using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Threading; namespace Gofer.NET.Utils { @@ -11,15 +12,15 @@ public static TaskInfo ToTaskInfo(this Expression expression) { var methodCallArgumentResolutionVisitor = new MethodCallArgumentResolutionVisitor(); var expressionWithArgumentsResolved = - (Expression) methodCallArgumentResolutionVisitor.Visit(expression); + (Expression)methodCallArgumentResolutionVisitor.Visit(expression); - var method = ((MethodCallExpression) expressionWithArgumentsResolved.Body); + var method = ((MethodCallExpression)expressionWithArgumentsResolved.Body); var m = method.Method; var args = method.Arguments .Select(a => { - var value = ((ConstantExpression) a).Value; - return value; + var value = ((ConstantExpression)a).Value; + return value is CancellationToken ? null : value; }) .ToArray(); @@ -27,20 +28,20 @@ public static TaskInfo ToTaskInfo(this Expression expression) return taskInfo; } - + public static TaskInfo ToTaskInfo(this Expression> expression) { var methodCallArgumentResolutionVisitor = new MethodCallArgumentResolutionVisitor(); var expressionWithArgumentsResolved = - (Expression>) methodCallArgumentResolutionVisitor.Visit(expression); + (Expression>)methodCallArgumentResolutionVisitor.Visit(expression); - var method = ((MethodCallExpression) expressionWithArgumentsResolved.Body); + var method = ((MethodCallExpression)expressionWithArgumentsResolved.Body); var m = method.Method; var args = method.Arguments .Select(a => { - var value = ((ConstantExpression) a).Value; - return value; + var value = ((ConstantExpression)a).Value; + return value is CancellationToken ? null : value; }) .ToArray(); @@ -48,13 +49,13 @@ public static TaskInfo ToTaskInfo(this Expression> expression) return taskInfo; } - + public static TaskInfo ToTaskInfo(this MethodInfo method, object[] args) { var methodParams = method.GetParameters(); var argTypes = new Type[methodParams.Length]; - - for (var i=0; i < methodParams.Length; ++ i) + + for (var i = 0; i < methodParams.Length; ++i) { argTypes[i] = methodParams[i].ParameterType; } @@ -70,7 +71,7 @@ public static TaskInfo ToTaskInfo(this MethodInfo method, object[] args) CreatedAtUtc = DateTime.UtcNow, ReturnType = method.ReturnType }; - + return taskInfo; } } diff --git a/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs b/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs new file mode 100644 index 0000000..d9f07a3 --- /dev/null +++ b/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading; + +using Newtonsoft.Json; + +namespace Gofer.NET.Utils +{ + public class JsonSkipCancellationTokenConverter : JsonConverter + { + public override bool CanRead => false; + + public override bool CanConvert(Type objectType) + { + return objectType == typeof(CancellationToken); + } + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + writer.WriteValue((object)null); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs index 9f88ea6..91f992c 100644 --- a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs +++ b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs @@ -1,4 +1,4 @@ -using System; + using Newtonsoft.Json; namespace Gofer.NET.Utils @@ -7,7 +7,7 @@ public static class JsonTaskInfoSerializer { public static string Serialize(TaskInfo taskInfo) { - return Serialize((object) taskInfo); + return Serialize((object)taskInfo); } public static string Serialize(object obj) @@ -20,7 +20,8 @@ public static string Serialize(object obj) settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter()); - + settings.Converters.Insert(2, new JsonSkipCancellationTokenConverter()); + var jsonString = JsonConvert.SerializeObject(obj, settings); return jsonString; @@ -30,21 +31,23 @@ public static TaskInfo Deserialize(string taskInfoJsonString) { return Deserialize(taskInfoJsonString); } - - public static T Deserialize(string jsonString) where T : class + + public static T Deserialize(string jsonString) where T : class { if (jsonString == null) { return null; } - + var settings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full }; + settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter()); + settings.Converters.Insert(2, new JsonSkipCancellationTokenConverter()); var obj = JsonConvert.DeserializeObject(jsonString, settings); return obj; diff --git a/Gofer.NET.Utils/TaskInfo.cs b/Gofer.NET.Utils/TaskInfo.cs index 3ef6365..35cb750 100644 --- a/Gofer.NET.Utils/TaskInfo.cs +++ b/Gofer.NET.Utils/TaskInfo.cs @@ -2,6 +2,7 @@ using System.Linq; using System.Reflection; using System.Runtime.Serialization; +using System.Threading; using System.Threading.Tasks; using Gofer.NET.Utils.Errors; @@ -43,7 +44,7 @@ public bool IsEquivalent(TaskInfo otherTaskInfo) } for (var i=0; i InvokeMethod(MethodInfo method, object instance) + private object[] GetInvokeArgs(CancellationToken cancellation) { + var args = Args.ToArray(); + for (int i = 0; i < args.Length; i++) + { + if (ArgTypes[i] == typeof(CancellationToken)) + args[i] = cancellation; + } + + return args; + } + + private async Task InvokeMethod(MethodInfo method, object instance, CancellationToken cancellation) + { + var args = GetInvokeArgs(cancellation); + if (method.IsAsync()) { - var result = method.Invoke(instance, Args); + var result = method.Invoke(instance, args); var task = (Task) result; await task; @@ -102,10 +117,10 @@ private async Task InvokeMethod(MethodInfo method, object instance) return resultValue; } - return method.Invoke(instance, Args); + return method.Invoke(instance, args); } - public async Task ExecuteTask() + public async Task ExecuteTask(CancellationToken token) { var assembly = Assembly.Load(AssemblyName); var type = assembly.GetType(TypeName); @@ -118,7 +133,7 @@ public async Task ExecuteTask() if (staticMethod != null) { - return await InvokeMethod(staticMethod, null); + return await InvokeMethod(staticMethod, null, token); } var instanceMethod = type.GetMethod(MethodName, @@ -134,7 +149,7 @@ public async Task ExecuteTask() var instance = Activator.CreateInstance(type); - return await InvokeMethod(instanceMethod, instance); + return await InvokeMethod(instanceMethod, instance, token); } } } \ No newline at end of file diff --git a/TaskClient.cs b/TaskClient.cs index 187dabb..ac0658c 100644 --- a/TaskClient.cs +++ b/TaskClient.cs @@ -1,21 +1,19 @@ using System; -using System.ComponentModel; using System.Threading; using System.Threading.Tasks; -using Gofer.NET.Errors; + using Gofer.NET.Utils; -using Newtonsoft.Json; namespace Gofer.NET { public class TaskClient { private static readonly object Locker = new object(); - + private const int PollDelay = 100; private bool IsCanceled { get; set; } - + public TaskQueue TaskQueue { get; } public Action OnError { get; } @@ -29,8 +27,8 @@ public class TaskClient private CancellationTokenSource ListenCancellationTokenSource { get; set; } public TaskClient( - TaskQueue taskQueue, - Action onError=null) + TaskQueue taskQueue, + Action onError = null) { TaskQueue = taskQueue; OnError = onError; @@ -38,26 +36,38 @@ public TaskClient( IsCanceled = false; } - public async Task Listen() + public Task Listen() + { + return Listen(CancellationToken.None); + } + + public async Task Listen(CancellationToken cancellation) { - Start(); + Start(cancellation); - await Task.WhenAll(new [] { - TaskRunnerThread, + await Task.WhenAll(new[] { + TaskRunnerThread, TaskSchedulerThread}); } public CancellationTokenSource Start() + { + return Start(CancellationToken.None); + } + + public CancellationTokenSource Start(CancellationToken cancellation) { if (TaskSchedulerThread != null || TaskRunnerThread != null) { throw new Exception("This TaskClient is already listening."); } - ListenCancellationTokenSource = new CancellationTokenSource(); + + ListenCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation); var token = ListenCancellationTokenSource.Token; - TaskSchedulerThread = Task.Run(async () => { + TaskSchedulerThread = Task.Run(async () => + { var inThreadTaskScheduler = new TaskScheduler(TaskQueue); while (true) @@ -71,22 +81,23 @@ public CancellationTokenSource Start() } }, ListenCancellationTokenSource.Token); - TaskRunnerThread = Task.Run(async () => { + TaskRunnerThread = Task.Run(async () => + { while (true) { if (token.IsCancellationRequested) { return; } - - await ExecuteQueuedTask(); + + await ExecuteQueuedTask(token); } }, ListenCancellationTokenSource.Token); return ListenCancellationTokenSource; } - private async Task ExecuteQueuedTask() + private async Task ExecuteQueuedTask(CancellationToken token) { var (json, info) = await TaskQueue.SafeDequeue(); if (info != null) @@ -96,9 +107,9 @@ private async Task ExecuteQueuedTask() try { var now = DateTime.Now; - - await info.ExecuteTask(); - + + await info.ExecuteTask(token); + var completionSeconds = (DateTime.Now - now).TotalSeconds; LogTaskFinished(info, completionSeconds); } @@ -122,7 +133,7 @@ private void LogTaskStarted(TaskInfo info) var logMessage = Messages.TaskStarted(info); ThreadSafeColoredConsole.Info(logMessage); } - + private void LogTaskFinished(TaskInfo info, double completionSeconds) { var logMessage = Messages.TaskFinished(info, completionSeconds); diff --git a/TaskQueue.cs b/TaskQueue.cs index db98de4..db55670 100644 --- a/TaskQueue.cs +++ b/TaskQueue.cs @@ -35,7 +35,13 @@ public async Task Enqueue(Expression expression) var taskInfo = expression.ToTaskInfo(); await Enqueue(taskInfo); } - + + public async Task Enqueue(Expression> expression) + { + var taskInfo = expression.ToTaskInfo(); + await Enqueue(taskInfo); + } + internal async Task Enqueue(TaskInfo taskInfo) { taskInfo.ConvertTypeArgs(); @@ -43,8 +49,8 @@ internal async Task Enqueue(TaskInfo taskInfo) await Backend.Enqueue(Config.QueueName, jsonString); } - - public async Task ExecuteNext() + + public async Task ExecuteNext(CancellationToken cancellation = default) { var (taskJsonString, taskInfo) = await SafeDequeue(); @@ -55,7 +61,7 @@ public async Task ExecuteNext() try { - await taskInfo.ExecuteTask(); + await taskInfo.ExecuteTask(cancellation); } finally { From df88b69f41539dede4cd1528c8175a809c8b40a8 Mon Sep 17 00:00:00 2001 From: ig-sinicyn Date: Sat, 25 Jul 2020 13:05:15 +0300 Subject: [PATCH 2/6] Fixes on review --- Gofer.NET.Tests/GivenARedisTaskQueue.cs | 4 ++-- Gofer.NET.Tests/GivenATaskInfo.cs | 2 +- Gofer.NET.Tests/TestQueueTestFixture.cs | 2 +- Gofer.NET.Utils/ActionExtensionMethods.cs | 5 ++--- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Gofer.NET.Tests/GivenARedisTaskQueue.cs b/Gofer.NET.Tests/GivenARedisTaskQueue.cs index f04a149..ad1e869 100644 --- a/Gofer.NET.Tests/GivenARedisTaskQueue.cs +++ b/Gofer.NET.Tests/GivenARedisTaskQueue.cs @@ -51,8 +51,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()), TC(() => ExceptionFunc(new CustomException(), semaphoreFile), new CustomException().ToString()), - // Cancelation Argument - TC(() => CancellationFunc(default, semaphoreFile), new CancellationToken().ToString()), + // Cancellation Argument + TC(() => CancellationFunc(new CancellationToken(), semaphoreFile), new CancellationToken().ToString()), // Integer Arguments TC(() => IntFunc(int.MaxValue, semaphoreFile), int.MaxValue.ToString()), diff --git a/Gofer.NET.Tests/GivenATaskInfo.cs b/Gofer.NET.Tests/GivenATaskInfo.cs index 0dd4096..3129854 100644 --- a/Gofer.NET.Tests/GivenATaskInfo.cs +++ b/Gofer.NET.Tests/GivenATaskInfo.cs @@ -90,7 +90,7 @@ private void TestMethod2() Console.WriteLine(nameof(TestMethod2)); } - private Task TestMethod3(CancellationToken cancellation = default) + private Task TestMethod3(CancellationToken cancellation) { Console.WriteLine(nameof(TestMethod3)); return Task.FromResult(nameof(TestMethod3)); diff --git a/Gofer.NET.Tests/TestQueueTestFixture.cs b/Gofer.NET.Tests/TestQueueTestFixture.cs index ae31d16..8e357f1 100644 --- a/Gofer.NET.Tests/TestQueueTestFixture.cs +++ b/Gofer.NET.Tests/TestQueueTestFixture.cs @@ -72,7 +72,7 @@ public static void WriteSemaphore(string semaphoreFile) WriteSemaphoreValue(semaphoreFile, SemaphoreText); } - public static async Task WaitForCancellationAndWriteSemaphore(string semaphoreFile, CancellationToken token = default) + public static async Task WaitForCancellationAndWriteSemaphore(string semaphoreFile, CancellationToken token) { try { diff --git a/Gofer.NET.Utils/ActionExtensionMethods.cs b/Gofer.NET.Utils/ActionExtensionMethods.cs index 431d7fd..91adf92 100644 --- a/Gofer.NET.Utils/ActionExtensionMethods.cs +++ b/Gofer.NET.Utils/ActionExtensionMethods.cs @@ -2,7 +2,6 @@ using System.Linq; using System.Linq.Expressions; using System.Reflection; -using System.Threading; namespace Gofer.NET.Utils { @@ -20,7 +19,7 @@ public static TaskInfo ToTaskInfo(this Expression expression) .Select(a => { var value = ((ConstantExpression)a).Value; - return value is CancellationToken ? null : value; + return value; }) .ToArray(); @@ -41,7 +40,7 @@ public static TaskInfo ToTaskInfo(this Expression> expression) .Select(a => { var value = ((ConstantExpression)a).Value; - return value is CancellationToken ? null : value; + return value; }) .ToArray(); From 12f4e8649c6403b9b926ea37245863f1c55d5e89 Mon Sep 17 00:00:00 2001 From: ig-sinicyn Date: Sat, 25 Jul 2020 13:36:47 +0300 Subject: [PATCH 3/6] Fix tests after review. I've added special handling for cancellation token to the IsEquivalent() method --- Gofer.NET.Tests/GivenATaskInfo.cs | 10 +++++++--- Gofer.NET.Utils/TaskInfo.cs | 25 ++++++++++++++++++------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/Gofer.NET.Tests/GivenATaskInfo.cs b/Gofer.NET.Tests/GivenATaskInfo.cs index 3129854..20b0c62 100644 --- a/Gofer.NET.Tests/GivenATaskInfo.cs +++ b/Gofer.NET.Tests/GivenATaskInfo.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; @@ -19,7 +20,7 @@ public void ItPersistsPropertiesWhenSerializedAndDeserialized() var taskInfos = new[] { GetTestTask(() => TestMethod1("hello world")), GetTestTask(() => TestMethod2()), - GetTestTask(() => TestMethod3(default)) + GetTestTask(() => TestMethod3(new CancellationToken())) }; foreach (var taskInfo in taskInfos) @@ -27,12 +28,15 @@ public void ItPersistsPropertiesWhenSerializedAndDeserialized() var serializedTaskInfo = JsonTaskInfoSerializer.Serialize(taskInfo); var deserializedTaskInfo = JsonTaskInfoSerializer.Deserialize(serializedTaskInfo); + // we do skip cancellation tokens as they are ignored on serialization + var importantTaskArgs = taskInfo.Args.Select(a => a is CancellationToken ? null : a).ToArray(); + deserializedTaskInfo.Id.Should().Be(taskInfo.Id); deserializedTaskInfo.AssemblyName.Should().Be(taskInfo.AssemblyName); deserializedTaskInfo.TypeName.Should().Be(taskInfo.TypeName); deserializedTaskInfo.MethodName.Should().Be(taskInfo.MethodName); deserializedTaskInfo.ReturnType.Should().Be(taskInfo.ReturnType); - deserializedTaskInfo.Args.ShouldAllBeEquivalentTo(taskInfo.Args); + deserializedTaskInfo.Args.ShouldAllBeEquivalentTo(importantTaskArgs); deserializedTaskInfo.ArgTypes.ShouldAllBeEquivalentTo(taskInfo.ArgTypes); deserializedTaskInfo.CreatedAtUtc.Should().Be(taskInfo.CreatedAtUtc); @@ -50,7 +54,7 @@ public void ItProperlyDeterminesEquivalence() var taskInfo2a = GetTestTask(() => TestMethod2()); var taskInfo2b = GetTestTask(() => TestMethod2()); - var taskInfo3a = GetTestTask(() => TestMethod3(default)); + var taskInfo3a = GetTestTask(() => TestMethod3(new CancellationToken())); var taskInfo3b = GetTestTask(() => TestMethod3(new CancellationTokenSource().Token)); var taskInfo4a = GetTestTask(() => Console.WriteLine("hello")); diff --git a/Gofer.NET.Utils/TaskInfo.cs b/Gofer.NET.Utils/TaskInfo.cs index 35cb750..6daf8ac 100644 --- a/Gofer.NET.Utils/TaskInfo.cs +++ b/Gofer.NET.Utils/TaskInfo.cs @@ -43,8 +43,12 @@ public bool IsEquivalent(TaskInfo otherTaskInfo) return false; } - for (var i=0; i Date: Thu, 20 Aug 2020 14:34:59 +0300 Subject: [PATCH 4/6] Fixes on review. Remove all cancellation token-related changes and replace them with a AsyncLocal(); --- Gofer.NET.Tests/GivenARedisTaskQueue.cs | 69 ++++++++----------- Gofer.NET.Tests/GivenATaskClient.cs | 2 +- Gofer.NET.Tests/GivenATaskInfo.cs | 41 +++-------- Gofer.NET.Tests/TestQueueTestFixture.cs | 9 ++- Gofer.NET.Utils/ActionExtensionMethods.cs | 22 +++--- .../JsonSkipCancellationTokenConverter.cs | 27 -------- Gofer.NET.Utils/JsonTaskInfoSerializer.cs | 15 ++-- Gofer.NET.Utils/TaskInfo.cs | 52 ++++---------- TaskClient.cs | 25 ++++--- TaskQueue.cs | 6 +- 10 files changed, 96 insertions(+), 172 deletions(-) delete mode 100644 Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs diff --git a/Gofer.NET.Tests/GivenARedisTaskQueue.cs b/Gofer.NET.Tests/GivenARedisTaskQueue.cs index ad1e869..29597c6 100644 --- a/Gofer.NET.Tests/GivenARedisTaskQueue.cs +++ b/Gofer.NET.Tests/GivenARedisTaskQueue.cs @@ -5,11 +5,8 @@ using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; - using FluentAssertions; - using Gofer.NET.Utils; - using Xunit; namespace Gofer.NET.Tests @@ -25,8 +22,8 @@ public override string ToString() return Value; } } - - private class CustomException : Exception { } + + private class CustomException : Exception {} [Fact] public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() @@ -43,16 +40,13 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() Tuple.Create, string>( actionExp, str); - + // Action to expected result var delgates = new Tuple, string>[] { // Exception Argument TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()), TC(() => ExceptionFunc(new CustomException(), semaphoreFile), new CustomException().ToString()), - - // Cancellation Argument - TC(() => CancellationFunc(new CancellationToken(), semaphoreFile), new CancellationToken().ToString()), // Integer Arguments TC(() => IntFunc(int.MaxValue, semaphoreFile), int.MaxValue.ToString()), @@ -105,29 +99,29 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => AsyncFunc(semaphoreFile).T(), "async"), TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async") }; - + foreach (var tup in delgates) { var actionExpr = tup.Item1; var expectedString = tup.Item2; - + File.Delete(semaphoreFile); - - await testFixture.TaskQueue.Enqueue(actionExpr); + + await testFixture.TaskQueue.Enqueue(actionExpr); await testFixture.TaskQueue.ExecuteNext(); File.ReadAllText(semaphoreFile).Should().Be(expectedString); } - + File.Delete(semaphoreFile); } - + [Fact] public async Task ItEnqueuesAndReceivesDelegatesThatAreRunnable() { var testFixture = new TaskQueueTestFixture(nameof(ItEnqueuesAndReceivesDelegatesThatAreRunnable)); - + testFixture.EnsureSemaphoreDoesntExist(); await testFixture.PushPopExecuteWriteSemaphore(); testFixture.EnsureSemaphore(); @@ -139,18 +133,18 @@ public async Task ItsTasksAreConsumedOnlyOnceByMultipleConsumers() // Higher numbers here increase confidence var numberOfJobs = 16; var numberOfConsumers = 4; - + var sharedTaskQueueName = nameof(ItsTasksAreConsumedOnlyOnceByMultipleConsumers); var consumers = Enumerable.Range(0, numberOfConsumers) .Select(_ => new TaskQueueTestFixture(sharedTaskQueueName)).ToList(); var semaphoreFiles = new List(); - for (int i = 0; i < numberOfJobs; ++i) + for(int i=0;i < numberOfJobs;++i) { var path = Path.GetTempFileName(); File.Delete(path); semaphoreFiles.Add(path); - + var sharedTaskQueue = consumers[0].TaskQueue; await sharedTaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(path)); } @@ -181,25 +175,25 @@ public async Task AsyncFunc(string semaphoreFile) { // Wait to ensure async waiting is happening. await Task.Delay(1000); - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async"); } - + public async Task AsyncFuncThatReturnsString(string semaphoreFile) { // Wait to ensure async waiting is happening. await Task.Delay(1000); - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async"); return "async"; } - + public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } - + public void DateTimeFunc(DateTime dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); @@ -209,37 +203,37 @@ public void IntFunc(int num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void NullableIntFunc(int? num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num ?? -1); } - + public void LongFunc(long num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void FloatFunc(float num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void BoolFunc(bool num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void DoubleFunc(double num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void StringFunc(string num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + public void ObjectFunc(object num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); @@ -255,26 +249,21 @@ public void ExceptionFunc(Exception exc, string semaphoreFile) TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, exc); } - public void CancellationFunc(CancellationToken ct, string semaphoreFile) - { - TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, ct); - } - public void TypeFunc(Type typeArg, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, typeArg?.ToString() ?? "null"); } - + public void ArrayFunc1(string[] nums, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums)); } - + public void ArrayFunc2(int[] nums, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums)); } - + public void ArrayFunc3(int?[] nums, string semaphoreFile) { var str = ""; @@ -285,7 +274,7 @@ public void ArrayFunc3(int?[] nums, string semaphoreFile) str += num?.ToString() ?? "null"; first = false; } - + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, str); } } diff --git a/Gofer.NET.Tests/GivenATaskClient.cs b/Gofer.NET.Tests/GivenATaskClient.cs index c774d31..31b580b 100644 --- a/Gofer.NET.Tests/GivenATaskClient.cs +++ b/Gofer.NET.Tests/GivenATaskClient.cs @@ -44,7 +44,7 @@ public async Task ItStopsOnCancellation() var taskClient = new TaskClient(taskQueue); var cancellation = new CancellationTokenSource(); - await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForCancellationAndWriteSemaphore(semaphoreFile, default)); + await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore(semaphoreFile)); var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None); await Task.Delay(waitTime, CancellationToken.None); diff --git a/Gofer.NET.Tests/GivenATaskInfo.cs b/Gofer.NET.Tests/GivenATaskInfo.cs index 20b0c62..827ccfd 100644 --- a/Gofer.NET.Tests/GivenATaskInfo.cs +++ b/Gofer.NET.Tests/GivenATaskInfo.cs @@ -1,14 +1,12 @@ using System; -using System.Linq; +using System.IO; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; - -using FluentAssertions; - using Gofer.NET.Utils; - using Xunit; +using FluentAssertions; +using System.Reflection; namespace Gofer.NET.Tests { @@ -16,27 +14,22 @@ public class GivenATaskInfo { [Fact] public void ItPersistsPropertiesWhenSerializedAndDeserialized() - { + { var taskInfos = new[] { GetTestTask(() => TestMethod1("hello world")), - GetTestTask(() => TestMethod2()), - GetTestTask(() => TestMethod3(new CancellationToken())) + GetTestTask(() => TestMethod2()) }; - foreach (var taskInfo in taskInfos) - { + foreach (var taskInfo in taskInfos) { var serializedTaskInfo = JsonTaskInfoSerializer.Serialize(taskInfo); var deserializedTaskInfo = JsonTaskInfoSerializer.Deserialize(serializedTaskInfo); - // we do skip cancellation tokens as they are ignored on serialization - var importantTaskArgs = taskInfo.Args.Select(a => a is CancellationToken ? null : a).ToArray(); - deserializedTaskInfo.Id.Should().Be(taskInfo.Id); deserializedTaskInfo.AssemblyName.Should().Be(taskInfo.AssemblyName); deserializedTaskInfo.TypeName.Should().Be(taskInfo.TypeName); deserializedTaskInfo.MethodName.Should().Be(taskInfo.MethodName); deserializedTaskInfo.ReturnType.Should().Be(taskInfo.ReturnType); - deserializedTaskInfo.Args.ShouldAllBeEquivalentTo(importantTaskArgs); + deserializedTaskInfo.Args.ShouldAllBeEquivalentTo(taskInfo.Args); deserializedTaskInfo.ArgTypes.ShouldAllBeEquivalentTo(taskInfo.ArgTypes); deserializedTaskInfo.CreatedAtUtc.Should().Be(taskInfo.CreatedAtUtc); @@ -54,11 +47,8 @@ public void ItProperlyDeterminesEquivalence() var taskInfo2a = GetTestTask(() => TestMethod2()); var taskInfo2b = GetTestTask(() => TestMethod2()); - var taskInfo3a = GetTestTask(() => TestMethod3(new CancellationToken())); - var taskInfo3b = GetTestTask(() => TestMethod3(new CancellationTokenSource().Token)); - - var taskInfo4a = GetTestTask(() => Console.WriteLine("hello")); - var taskInfo4b = GetTestTask(() => Console.WriteLine("hello world")); + var taskInfo3a = GetTestTask(() => Console.WriteLine("hello")); + var taskInfo3b = GetTestTask(() => Console.WriteLine("hello world")); taskInfo1a.IsEquivalent(taskInfo1a).Should().BeTrue(); taskInfo1a.IsEquivalent(taskInfo1b).Should().BeTrue(); @@ -73,13 +63,10 @@ public void ItProperlyDeterminesEquivalence() taskInfo2a.IsEquivalent(taskInfo3a).Should().BeFalse(); taskInfo3a.IsEquivalent(taskInfo3a).Should().BeTrue(); - taskInfo3a.IsEquivalent(taskInfo3b).Should().BeTrue(); - - taskInfo4a.IsEquivalent(taskInfo4a).Should().BeTrue(); - taskInfo4a.IsEquivalent(taskInfo4b).Should().BeFalse(); + taskInfo3a.IsEquivalent(taskInfo3b).Should().BeFalse(); } - private TaskInfo GetTestTask(Expression action) + private TaskInfo GetTestTask(Expression action) { return action.ToTaskInfo(); } @@ -93,11 +80,5 @@ private void TestMethod2() { Console.WriteLine(nameof(TestMethod2)); } - - private Task TestMethod3(CancellationToken cancellation) - { - Console.WriteLine(nameof(TestMethod3)); - return Task.FromResult(nameof(TestMethod3)); - } } } \ No newline at end of file diff --git a/Gofer.NET.Tests/TestQueueTestFixture.cs b/Gofer.NET.Tests/TestQueueTestFixture.cs index 8e357f1..8a50138 100644 --- a/Gofer.NET.Tests/TestQueueTestFixture.cs +++ b/Gofer.NET.Tests/TestQueueTestFixture.cs @@ -36,11 +36,11 @@ public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null) while (TaskQueue.Dequeue().Result != null) { } } - public async Task PushPopExecuteWriteSemaphore(CancellationToken cancellation = default) + public async Task PushPopExecuteWriteSemaphore() { await TaskQueue.Enqueue(() => WriteSemaphore(_semaphoreFile)); var dequeuedTaskInfo = await TaskQueue.Dequeue(); - await dequeuedTaskInfo.ExecuteTask(cancellation); + await dequeuedTaskInfo.ExecuteTask(); } public void EnsureSemaphoreDoesntExist() @@ -72,8 +72,11 @@ public static void WriteSemaphore(string semaphoreFile) WriteSemaphoreValue(semaphoreFile, SemaphoreText); } - public static async Task WaitForCancellationAndWriteSemaphore(string semaphoreFile, CancellationToken token) + public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile) { + var token = TaskClient.GetListenCancellation(); + if (!token.CanBeCanceled) + throw new InvalidOperationException("This method must be called from a task client callback"); try { await Task.Delay(-1, token); diff --git a/Gofer.NET.Utils/ActionExtensionMethods.cs b/Gofer.NET.Utils/ActionExtensionMethods.cs index 91adf92..de3ca47 100644 --- a/Gofer.NET.Utils/ActionExtensionMethods.cs +++ b/Gofer.NET.Utils/ActionExtensionMethods.cs @@ -11,14 +11,14 @@ public static TaskInfo ToTaskInfo(this Expression expression) { var methodCallArgumentResolutionVisitor = new MethodCallArgumentResolutionVisitor(); var expressionWithArgumentsResolved = - (Expression)methodCallArgumentResolutionVisitor.Visit(expression); + (Expression) methodCallArgumentResolutionVisitor.Visit(expression); - var method = ((MethodCallExpression)expressionWithArgumentsResolved.Body); + var method = ((MethodCallExpression) expressionWithArgumentsResolved.Body); var m = method.Method; var args = method.Arguments .Select(a => { - var value = ((ConstantExpression)a).Value; + var value = ((ConstantExpression) a).Value; return value; }) .ToArray(); @@ -27,19 +27,19 @@ public static TaskInfo ToTaskInfo(this Expression expression) return taskInfo; } - + public static TaskInfo ToTaskInfo(this Expression> expression) { var methodCallArgumentResolutionVisitor = new MethodCallArgumentResolutionVisitor(); var expressionWithArgumentsResolved = - (Expression>)methodCallArgumentResolutionVisitor.Visit(expression); + (Expression>) methodCallArgumentResolutionVisitor.Visit(expression); - var method = ((MethodCallExpression)expressionWithArgumentsResolved.Body); + var method = ((MethodCallExpression) expressionWithArgumentsResolved.Body); var m = method.Method; var args = method.Arguments .Select(a => { - var value = ((ConstantExpression)a).Value; + var value = ((ConstantExpression) a).Value; return value; }) .ToArray(); @@ -48,13 +48,13 @@ public static TaskInfo ToTaskInfo(this Expression> expression) return taskInfo; } - + public static TaskInfo ToTaskInfo(this MethodInfo method, object[] args) { var methodParams = method.GetParameters(); var argTypes = new Type[methodParams.Length]; - - for (var i = 0; i < methodParams.Length; ++i) + + for (var i=0; i < methodParams.Length; ++ i) { argTypes[i] = methodParams[i].ParameterType; } @@ -70,7 +70,7 @@ public static TaskInfo ToTaskInfo(this MethodInfo method, object[] args) CreatedAtUtc = DateTime.UtcNow, ReturnType = method.ReturnType }; - + return taskInfo; } } diff --git a/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs b/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs deleted file mode 100644 index d9f07a3..0000000 --- a/Gofer.NET.Utils/JsonSkipCancellationTokenConverter.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using System.Threading; - -using Newtonsoft.Json; - -namespace Gofer.NET.Utils -{ - public class JsonSkipCancellationTokenConverter : JsonConverter - { - public override bool CanRead => false; - - public override bool CanConvert(Type objectType) - { - return objectType == typeof(CancellationToken); - } - - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - writer.WriteValue((object)null); - } - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file diff --git a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs index 91f992c..9f88ea6 100644 --- a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs +++ b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs @@ -1,4 +1,4 @@ - +using System; using Newtonsoft.Json; namespace Gofer.NET.Utils @@ -7,7 +7,7 @@ public static class JsonTaskInfoSerializer { public static string Serialize(TaskInfo taskInfo) { - return Serialize((object)taskInfo); + return Serialize((object) taskInfo); } public static string Serialize(object obj) @@ -20,8 +20,7 @@ public static string Serialize(object obj) settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter()); - settings.Converters.Insert(2, new JsonSkipCancellationTokenConverter()); - + var jsonString = JsonConvert.SerializeObject(obj, settings); return jsonString; @@ -31,23 +30,21 @@ public static TaskInfo Deserialize(string taskInfoJsonString) { return Deserialize(taskInfoJsonString); } - - public static T Deserialize(string jsonString) where T : class + + public static T Deserialize(string jsonString) where T : class { if (jsonString == null) { return null; } - + var settings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full }; - settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter()); - settings.Converters.Insert(2, new JsonSkipCancellationTokenConverter()); var obj = JsonConvert.DeserializeObject(jsonString, settings); return obj; diff --git a/Gofer.NET.Utils/TaskInfo.cs b/Gofer.NET.Utils/TaskInfo.cs index 6daf8ac..3ef6365 100644 --- a/Gofer.NET.Utils/TaskInfo.cs +++ b/Gofer.NET.Utils/TaskInfo.cs @@ -2,7 +2,6 @@ using System.Linq; using System.Reflection; using System.Runtime.Serialization; -using System.Threading; using System.Threading.Tasks; using Gofer.NET.Utils.Errors; @@ -43,12 +42,8 @@ public bool IsEquivalent(TaskInfo otherTaskInfo) return false; } - for (var i = 0; i < Args.Length; ++i) - { - // comparison ignores cancellation tokens - // as they are ignored on serialization - // and will be replaced with proper value at invocation time - if (!Equals(GetEquivalenceArg(Args[i]), GetEquivalenceArg(otherTaskInfo.Args[i]))) + for (var i=0; i InvokeMethod(MethodInfo method, object instance) { - var args = Args.ToArray(); - for (int i = 0; i < args.Length; i++) - { - if (ArgTypes[i] == typeof(CancellationToken)) - args[i] = cancellation; - } - - return args; - } - - private async Task InvokeMethod(MethodInfo method, object instance, CancellationToken cancellation) - { - var args = GetInvokeArgs(cancellation); - if (method.IsAsync()) { - var result = method.Invoke(instance, args); + var result = method.Invoke(instance, Args); var task = (Task) result; await task; @@ -128,10 +102,10 @@ private async Task InvokeMethod(MethodInfo method, object instance, Canc return resultValue; } - return method.Invoke(instance, args); + return method.Invoke(instance, Args); } - public async Task ExecuteTask(CancellationToken token) + public async Task ExecuteTask() { var assembly = Assembly.Load(AssemblyName); var type = assembly.GetType(TypeName); @@ -144,7 +118,7 @@ public async Task ExecuteTask(CancellationToken token) if (staticMethod != null) { - return await InvokeMethod(staticMethod, null, token); + return await InvokeMethod(staticMethod, null); } var instanceMethod = type.GetMethod(MethodName, @@ -160,7 +134,7 @@ public async Task ExecuteTask(CancellationToken token) var instance = Activator.CreateInstance(type); - return await InvokeMethod(instanceMethod, instance, token); + return await InvokeMethod(instanceMethod, instance); } } } \ No newline at end of file diff --git a/TaskClient.cs b/TaskClient.cs index ac0658c..3e731ea 100644 --- a/TaskClient.cs +++ b/TaskClient.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -8,11 +9,13 @@ namespace Gofer.NET { public class TaskClient { + private static readonly AsyncLocal _listenCancellationContext = new AsyncLocal(); + private static readonly object Locker = new object(); private const int PollDelay = 100; - private bool IsCanceled { get; set; } + public static CancellationToken GetListenCancellation() => _listenCancellationContext.Value; public TaskQueue TaskQueue { get; } @@ -33,7 +36,6 @@ public TaskClient( TaskQueue = taskQueue; OnError = onError; TaskScheduler = new TaskScheduler(TaskQueue); - IsCanceled = false; } public Task Listen() @@ -79,7 +81,7 @@ public CancellationTokenSource Start(CancellationToken cancellation) await inThreadTaskScheduler.Tick(); } - }, ListenCancellationTokenSource.Token); + }, token); TaskRunnerThread = Task.Run(async () => { @@ -92,7 +94,7 @@ public CancellationTokenSource Start(CancellationToken cancellation) await ExecuteQueuedTask(token); } - }, ListenCancellationTokenSource.Token); + }, token); return ListenCancellationTokenSource; } @@ -103,20 +105,25 @@ private async Task ExecuteQueuedTask(CancellationToken token) if (info != null) { LogTaskStarted(info); - + var old = _listenCancellationContext.Value; try { - var now = DateTime.Now; - - await info.ExecuteTask(token); + _listenCancellationContext.Value = token; - var completionSeconds = (DateTime.Now - now).TotalSeconds; + var executionTimer = Stopwatch.StartNew(); + await info.ExecuteTask(); + executionTimer.Stop(); + var completionSeconds = executionTimer.Elapsed.TotalSeconds; LogTaskFinished(info, completionSeconds); } catch (Exception e) { LogTaskException(info, e); } + finally + { + _listenCancellationContext.Value = old; + } } } diff --git a/TaskQueue.cs b/TaskQueue.cs index db55670..d9a7f8c 100644 --- a/TaskQueue.cs +++ b/TaskQueue.cs @@ -49,8 +49,8 @@ internal async Task Enqueue(TaskInfo taskInfo) await Backend.Enqueue(Config.QueueName, jsonString); } - - public async Task ExecuteNext(CancellationToken cancellation = default) + + public async Task ExecuteNext() { var (taskJsonString, taskInfo) = await SafeDequeue(); @@ -61,7 +61,7 @@ public async Task ExecuteNext(CancellationToken cancellation = default) try { - await taskInfo.ExecuteTask(cancellation); + await taskInfo.ExecuteTask(); } finally { From 6bd61122d161dfd2ecf5cfa3e234d99a771507f9 Mon Sep 17 00:00:00 2001 From: ig-sinicyn Date: Sun, 23 Aug 2020 16:06:11 +0300 Subject: [PATCH 5/6] + Timeout parameter for the WaitForTaskClientCancellationAndWriteSemaphore + Fix TimeSpan and DateTimeOffset parameter passing --- Gofer.NET.Tests/GivenARedisTaskQueue.cs | 35 ++++++++++++++++++++----- Gofer.NET.Tests/GivenATaskClient.cs | 8 ++++-- Gofer.NET.Tests/TestQueueTestFixture.cs | 4 +-- Gofer.NET.Utils/TaskInfo.cs | 24 ++++++++++++----- 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/Gofer.NET.Tests/GivenARedisTaskQueue.cs b/Gofer.NET.Tests/GivenARedisTaskQueue.cs index 29597c6..8b36732 100644 --- a/Gofer.NET.Tests/GivenARedisTaskQueue.cs +++ b/Gofer.NET.Tests/GivenARedisTaskQueue.cs @@ -3,7 +3,6 @@ using System.IO; using System.Linq; using System.Linq.Expressions; -using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Gofer.NET.Utils; @@ -35,14 +34,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() var semaphoreFile = Path.GetTempFileName(); var now = DateTime.Now; var utcNow = DateTime.UtcNow; + var nowOffset = DateTimeOffset.Now; + var utcNowOffset = DateTimeOffset.UtcNow; Func, string, Tuple, string>> TC = (actionExp, str) => Tuple.Create, string>( actionExp, str); - + // Action to expected result - var delgates = new Tuple, string>[] + var delegates = new Tuple, string>[] { // Exception Argument TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()), @@ -73,6 +74,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => StringFunc("astring", semaphoreFile), "astring"), TC(() => StringFunc(variableToExtract, semaphoreFile), variableToExtract), + TC(() => TimeSpanFunc(TimeSpan.FromMinutes(1), semaphoreFile), TimeSpan.FromMinutes(1).ToString()), + // Object Arguments + Overloaded Version TC(() => ObjectFunc(new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"), TC(() => ObjectFunc(null, new TestDataHolder {Value = "astring"}, semaphoreFile), "astring"), @@ -80,8 +83,12 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => DateTimeFunc(now, semaphoreFile), now.ToString()), TC(() => DateTimeFunc(utcNow, semaphoreFile), utcNow.ToString()), + TC(() => DateTimeOffsetFunc(nowOffset, semaphoreFile), nowOffset.ToString()), + TC(() => DateTimeOffsetFunc(utcNowOffset, semaphoreFile), utcNowOffset.ToString()), + TC(() => NullableTypeFunc(null, semaphoreFile), "null"), TC(() => NullableTypeFunc(now, semaphoreFile), now.ToString()), + TC(() => NullableTypeFunc2(nowOffset, semaphoreFile), nowOffset.ToString()), TC(() => ArrayFunc1(new[] {"this", "string", "is"}, semaphoreFile), "this,string,is"), TC(() => ArrayFunc2(new[] {1, 2, 3, 4}, semaphoreFile), "1,2,3,4"), TC(() => ArrayFunc3(new int?[] {1, 2, 3, null, 5}, semaphoreFile), "1,2,3,null,5"), @@ -99,9 +106,8 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate() TC(() => AsyncFunc(semaphoreFile).T(), "async"), TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async") }; - - foreach (var tup in delgates) + foreach (var tup in delegates) { var actionExpr = tup.Item1; var expectedString = tup.Item2; @@ -193,12 +199,22 @@ public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } - + + public void NullableTypeFunc2(DateTimeOffset? dateTime, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); + } + public void DateTimeFunc(DateTime dateTime, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); } + public void DateTimeOffsetFunc(DateTimeOffset dateTime, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime); + } + public void IntFunc(int num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); @@ -233,7 +249,12 @@ public void StringFunc(string num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); } - + + public void TimeSpanFunc(TimeSpan num, string semaphoreFile) + { + TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); + } + public void ObjectFunc(object num, string semaphoreFile) { TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num); diff --git a/Gofer.NET.Tests/GivenATaskClient.cs b/Gofer.NET.Tests/GivenATaskClient.cs index 31b580b..9f9ef15 100644 --- a/Gofer.NET.Tests/GivenATaskClient.cs +++ b/Gofer.NET.Tests/GivenATaskClient.cs @@ -37,14 +37,18 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException() public async Task ItStopsOnCancellation() { var semaphoreFile = Path.GetTempFileName(); + var timeout = TimeSpan.FromMinutes(1); - var waitTime = 2000; + var waitTime = TimeSpan.FromSeconds(2); var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue(); var taskClient = new TaskClient(taskQueue); var cancellation = new CancellationTokenSource(); - await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore(semaphoreFile)); + await taskClient.TaskQueue.Enqueue(() => + TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore( + semaphoreFile, + timeout)); var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None); await Task.Delay(waitTime, CancellationToken.None); diff --git a/Gofer.NET.Tests/TestQueueTestFixture.cs b/Gofer.NET.Tests/TestQueueTestFixture.cs index 8a50138..a79b2b2 100644 --- a/Gofer.NET.Tests/TestQueueTestFixture.cs +++ b/Gofer.NET.Tests/TestQueueTestFixture.cs @@ -72,14 +72,14 @@ public static void WriteSemaphore(string semaphoreFile) WriteSemaphoreValue(semaphoreFile, SemaphoreText); } - public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile) + public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile, TimeSpan timeout) { var token = TaskClient.GetListenCancellation(); if (!token.CanBeCanceled) throw new InvalidOperationException("This method must be called from a task client callback"); try { - await Task.Delay(-1, token); + await Task.Delay(timeout, token); } catch (OperationCanceledException) { diff --git a/Gofer.NET.Utils/TaskInfo.cs b/Gofer.NET.Utils/TaskInfo.cs index 3ef6365..01a3a89 100644 --- a/Gofer.NET.Utils/TaskInfo.cs +++ b/Gofer.NET.Utils/TaskInfo.cs @@ -55,14 +55,15 @@ public bool IsEquivalent(TaskInfo otherTaskInfo) && ReturnType.Equals(otherTaskInfo.ReturnType); } - public void ConvertTypeArgs() + public void ConvertTypeArgs() { for (var i=0;i Date: Mon, 24 Aug 2020 09:49:07 +0300 Subject: [PATCH 6/6] Missing file --- Gofer.NET.Utils/JsonTaskInfoSerializer.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs index 9f88ea6..dfc55c9 100644 --- a/Gofer.NET.Utils/JsonTaskInfoSerializer.cs +++ b/Gofer.NET.Utils/JsonTaskInfoSerializer.cs @@ -41,7 +41,8 @@ public static T Deserialize(string jsonString) where T : class var settings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All, - TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full + TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full, + DateParseHandling = DateParseHandling.DateTimeOffset }; settings.Converters.Insert(0, new JsonPrimitiveConverter()); settings.Converters.Insert(1, new ExceptionConverter());