-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathTaskQueue.cs
138 lines (121 loc) · 4.27 KB
/
TaskQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Gofer.NET.Utils;
using Newtonsoft.Json;
namespace Gofer.NET
{
public partial class TaskQueue
{
public IBackend Backend { get; }
public TaskQueueConfiguration Config { get; }
public TaskQueue(IBackend backend, TaskQueueConfiguration config=null)
{
Backend = backend;
Config = config ?? TaskQueueConfiguration.Default();
// Usage of the Task Queue in Parallel Threads, requires the thread pool size to be increased.
// https://stackexchange.github.io/StackExchange.Redis/Timeouts#are-you-seeing-high-number-of-busyio-or-busyworker-threads-in-the-timeout-exception
// REVIEW: This should no longer be necessary now that we are using the redis async api.
if (Config.ThreadSafe)
{
ThreadPool.SetMinThreads(200, 200);
}
}
public async Task Enqueue(Expression<Action> expression)
{
var taskInfo = expression.ToTaskInfo();
await Enqueue(taskInfo);
}
internal async Task Enqueue(TaskInfo taskInfo)
{
taskInfo.ConvertTypeArgs();
var jsonString = JsonTaskInfoSerializer.Serialize(taskInfo);
await Backend.Enqueue(Config.QueueName, jsonString);
}
public async Task<long> Count()
{
return await Backend.QueueCount(Config.QueueName);
}
public async Task<bool> ExecuteNext()
{
var (taskJsonString, taskInfo) = await SafeDequeue();
if (taskInfo == null)
{
return false;
}
try
{
await taskInfo.ExecuteTask();
}
finally
{
// Backend.RemoveBackup(taskJsonString);
}
return true;
}
/// <summary>
/// Returns the serialized TaskInfo as well as deserialized so that the serialized value can later
/// be removed from the backing queue.
/// </summary>
/// <returns></returns>
public async Task<(string, TaskInfo)> SafeDequeue()
{
var jsonString = await Backend.Dequeue(Config.QueueName);
if (jsonString == null)
{
return (null, null);
}
var taskInfo = JsonTaskInfoSerializer.Deserialize(jsonString);
taskInfo.UnconvertTypeArgs();
return (jsonString, taskInfo);
}
internal async Task<TaskInfo> Dequeue()
{
var jsonString = await Backend.Dequeue(Config.QueueName);
if (jsonString == null)
{
return null;
}
var taskInfo = JsonTaskInfoSerializer.Deserialize(jsonString);
taskInfo.UnconvertTypeArgs();
return taskInfo;
}
public void RestoreExpiredBackupTasks()
{
// TaskInfo taskInfo;
//
// while (true)
// {
// taskInfo = JsonTaskInfoSerializer.Deserialize(Backend.PeekBackup());
// if (taskInfo?.IsExpired(Config.MessageRetryTimeSpan) ?? true)
// {
// break;
// }
//
// var lockKey = nameof(RestoreExpiredBackupTasks) + "::" + taskInfo.Id;
// var backupLock = Backend.LockBlocking(lockKey);
//
// try
// {
// var currentTopStr = Backend.PeekBackup();
// if (!string.IsNullOrEmpty(currentTopStr))
// {
// var currentTop = JsonTaskInfoSerializer.Deserialize(currentTopStr);
// if (currentTop?.Id.Equals(taskInfo.Id) ?? false)
// {
// Backend.RestoreTopBackup();
// }
// }
// }
// finally
// {
// backupLock.Release();
// }
// }
}
}
}