-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathscheduler.h
124 lines (116 loc) · 3.13 KB
/
scheduler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#ifndef ALEXAAGENT__SCHEDULER_H
#define ALEXAAGENT__SCHEDULER_H
#include <memory>
#include <set>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
class t_task
{
boost::asio::yield_context& v_yield;
boost::asio::steady_timer v_timer;
std::deque<std::function<void(boost::asio::yield_context&)>> v_actions;
public:
t_task(boost::asio::yield_context& a_yield, boost::asio::io_service& a_io) : v_yield(a_yield), v_timer(a_io)
{
}
void f_wait(const boost::asio::steady_timer::duration& a_duration = std::chrono::duration<int>::max())
{
v_timer.expires_from_now(a_duration);
boost::system::error_code ec;
v_timer.async_wait(v_yield[ec]);
while (!v_actions.empty()) {
auto action = std::move(v_actions.front());
v_actions.pop_front();
action(v_yield);
}
}
void f_notify()
{
v_timer.cancel();
}
void f_post(std::function<void(boost::asio::yield_context&)>&& a_action)
{
v_actions.push_back(std::move(a_action));
f_notify();
}
};
class t_scheduler : public boost::asio::io_service::strand
{
std::set<std::unique_ptr<boost::asio::steady_timer>> v_timers;
std::set<t_task*> v_tasks;
template<typename T_callback>
void f_run_every(std::set<std::unique_ptr<boost::asio::steady_timer>>::iterator a_i, const boost::asio::steady_timer::duration& a_duration, T_callback a_callback)
{
(*a_i)->expires_from_now(a_duration);
(*a_i)->async_wait(wrap([this, a_i, a_duration, a_callback](auto a_ec)
{
if (a_callback(a_ec))
this->f_run_every(a_i, a_duration, a_callback);
else
v_timers.erase(a_i);
}));
}
public:
class t_stop
{
};
t_scheduler(boost::asio::io_service& a_io) : boost::asio::io_service::strand(a_io)
{
}
boost::asio::io_service& f_io()
{
return get_io_service();
}
template<typename T_callback>
boost::asio::steady_timer& f_run_in(const boost::asio::steady_timer::duration& a_duration, T_callback a_callback)
{
auto i = v_timers.insert(std::make_unique<boost::asio::steady_timer>(f_io(), a_duration)).first;
(*i)->async_wait(wrap([this, i, a_callback](auto a_ec)
{
a_callback(a_ec);
v_timers.erase(i);
}));
return **i;
}
template<typename T_callback>
boost::asio::steady_timer& f_run_every(const boost::asio::steady_timer::duration& a_duration, T_callback a_callback)
{
auto i = v_timers.insert(std::make_unique<boost::asio::steady_timer>(f_io())).first;
f_run_every(i, a_duration, a_callback);
return **i;
}
template<typename T_main>
void f_spawn(T_main&& a_main)
{
boost::asio::spawn(static_cast<boost::asio::io_service::strand&>(*this), [this, a_main = std::move(a_main)](auto a_yield)
{
t_task task(a_yield, this->f_io());
auto i = v_tasks.insert(&task).first;
try {
a_main(task);
} catch (...) {
}
v_tasks.erase(i);
});
}
template<typename T_done>
void f_shutdown(T_done&& a_done)
{
if (v_tasks.empty()) {
a_done();
return;
}
auto& stopping = f_run_every(std::chrono::duration<int>::max(), [this, a_done = std::move(a_done)](auto)
{
if (!v_tasks.empty()) return true;
a_done();
return false;
});
for (auto& x : v_tasks) x->f_post([&](auto)
{
stopping.cancel();
throw t_stop();
});
}
};
#endif