Skip to content

Commit 88e0b31

Browse files
authored
Estimator (#663)
* Add estimator Signed-off-by: ortyomka <[email protected]>
1 parent 3c5df2c commit 88e0b31

File tree

9 files changed

+502
-41
lines changed

9 files changed

+502
-41
lines changed

core/miner/main/main.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "sector_storage/fetch_handler.hpp"
4646
#include "sector_storage/impl/manager_impl.hpp"
4747
#include "sector_storage/impl/scheduler_impl.hpp"
48+
#include "sector_storage/impl/worker_estimator_impl.hpp"
4849
#include "sector_storage/stores/impl/index_impl.hpp"
4950
#include "sector_storage/stores/impl/local_store.hpp"
5051
#include "sector_storage/stores/impl/remote_store.hpp"
@@ -95,6 +96,7 @@ namespace fc {
9596
boost::optional<RegisteredSealProof> seal_type;
9697
std::vector<Address> precommit_control;
9798
int api_port{};
99+
uint64_t estimator_window{};
98100

99101
/** Path to presealed sectors */
100102
boost::optional<boost::filesystem::path> preseal_path;
@@ -113,8 +115,9 @@ namespace fc {
113115
const std::shared_ptr<storage::PersistentBufferMap> &ds) {
114116
OUTCOME_TRY(file, common::readFile(path));
115117
OUTCOME_TRY(j_file, codec::json::parse(gsl::make_span(file)));
116-
OUTCOME_TRY(
117-
psm, codec::json::decode<std::map<std::string, miner::types::Miner>>(j_file));
118+
OUTCOME_TRY(psm,
119+
codec::json::decode<std::map<std::string, miner::types::Miner>>(
120+
j_file));
118121

119122
const auto it_psm = psm.find(encodeToString(maddr));
120123
if (it_psm == psm.end()) {
@@ -155,6 +158,8 @@ namespace fc {
155158
option("owner", po::value(&config.owner));
156159
option("worker", po::value(&config.worker));
157160
option("sector-size", po::value(&raw.sector_size));
161+
option("estimator-window",
162+
po::value(&config.estimator_window)->default_value(10));
158163
option("precommit-control", po::value(&config.precommit_control));
159164
option("pre-sealed-sectors",
160165
po::value(&config.preseal_path),
@@ -425,7 +430,11 @@ namespace fc {
425430
IoThread io_thread2;
426431
OUTCOME_TRY(wscheduler,
427432
sector_storage::SchedulerImpl::newScheduler(
428-
io_thread2.io, prefixed("scheduler_works/")));
433+
io_thread2.io,
434+
prefixed("scheduler_works/"),
435+
std::make_shared<sector_storage::EstimatorImpl>(
436+
config.estimator_window)));
437+
429438
IoThread io_thread3;
430439

431440
{

core/sector_storage/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ target_link_libraries(remote_worker
4242
rpc
4343
)
4444

45+
add_library(estimator
46+
impl/worker_estimator_impl.cpp
47+
)
48+
49+
target_link_libraries(estimator
50+
Boost::boost
51+
)
52+
4553
add_library(scheduler
4654
impl/scheduler_impl.cpp
4755
)
@@ -51,6 +59,7 @@ target_link_libraries(scheduler
5159
resources
5260
logger
5361
worker
62+
estimator
5463
Boost::thread
5564
)
5665

core/sector_storage/impl/scheduler_impl.cpp

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@ namespace fc::sector_storage {
1616

1717
outcome::result<std::shared_ptr<SchedulerImpl>> SchedulerImpl::newScheduler(
1818
std::shared_ptr<boost::asio::io_context> io_context,
19-
std::shared_ptr<BufferMap> datastore) {
19+
std::shared_ptr<BufferMap> datastore,
20+
std::shared_ptr<Estimator> estimator) {
2021
struct make_unique_enabler : public SchedulerImpl {
2122
make_unique_enabler(std::shared_ptr<boost::asio::io_context> io_context,
22-
std::shared_ptr<BufferMap> datastore)
23-
: SchedulerImpl{std::move(io_context), std::move(datastore)} {};
23+
std::shared_ptr<BufferMap> datastore,
24+
std::shared_ptr<Estimator> estimator)
25+
: SchedulerImpl{std::move(io_context),
26+
std::move(datastore),
27+
std::move(estimator)} {};
2428
};
2529

2630
std::shared_ptr<SchedulerImpl> scheduler =
27-
std::make_shared<make_unique_enabler>(std::move(io_context),
28-
std::move(datastore));
31+
std::make_shared<make_unique_enabler>(
32+
std::move(io_context), std::move(datastore), std::move(estimator));
2933

3034
OUTCOME_TRY(scheduler->resetWorks());
3135

@@ -34,8 +38,10 @@ namespace fc::sector_storage {
3438

3539
SchedulerImpl::SchedulerImpl(
3640
std::shared_ptr<boost::asio::io_context> io_context,
37-
std::shared_ptr<BufferMap> datastore)
41+
std::shared_ptr<BufferMap> datastore,
42+
std::shared_ptr<Estimator> estimator)
3843
: current_worker_id_(0),
44+
estimator_(std::move(estimator)),
3945
call_kv_(std::move(datastore)),
4046
io_(std::move(io_context)),
4147
logger_(common::createLogger("scheduler")) {}
@@ -143,7 +149,7 @@ namespace fc::sector_storage {
143149
if (current_worker_id_ == std::numeric_limits<uint64_t>::max()) {
144150
current_worker_id_ = 0; // TODO(ortyomka): maybe better mechanism
145151
}
146-
WorkerID wid = current_worker_id_++;
152+
WorkerId wid = current_worker_id_++;
147153
workers_.insert({wid, std::move(worker)});
148154
lock.unlock();
149155

@@ -154,7 +160,7 @@ namespace fc::sector_storage {
154160
const std::shared_ptr<TaskRequest> &request) {
155161
std::lock_guard<std::mutex> lock(workers_lock_);
156162

157-
std::vector<WorkerID> acceptable;
163+
std::vector<WorkerId> acceptable;
158164
uint64_t tried = 0;
159165

160166
for (const auto &[wid, worker] : workers_) {
@@ -179,27 +185,47 @@ namespace fc::sector_storage {
179185

180186
if (!acceptable.empty()) {
181187
bool does_error_occurs = false;
182-
std::stable_sort(acceptable.begin(),
183-
acceptable.end(),
184-
[&](WorkerID lhs, WorkerID rhs) {
185-
auto maybe_res = request->sel->is_preferred(
186-
request->task_type, workers_[lhs], workers_[rhs]);
187-
188-
if (maybe_res.has_error()) {
189-
logger_->error("selecting best worker: "
190-
+ maybe_res.error().message());
191-
does_error_occurs = true;
192-
return false;
193-
}
194-
195-
return maybe_res.value();
196-
});
188+
std::stable_sort(
189+
acceptable.begin(),
190+
acceptable.end(),
191+
[&](WorkerId lhs, WorkerId rhs) {
192+
const auto l_time = estimator_->getTime(lhs, request->task_type);
193+
const auto r_time = estimator_->getTime(rhs, request->task_type);
194+
195+
// if time is available, then compare them
196+
if (l_time and r_time) {
197+
return l_time < r_time;
198+
}
199+
200+
// if some workers don't have data about time, then we prefer worker
201+
// without time, to give a chance to prove yourself
202+
if (l_time) {
203+
return false;
204+
}
205+
206+
if (r_time) {
207+
return true;
208+
}
209+
210+
// if both workers without time, then compare with selector
211+
auto maybe_res = request->sel->is_preferred(
212+
request->task_type, workers_[lhs], workers_[rhs]);
213+
214+
if (maybe_res.has_error()) {
215+
logger_->error("selecting best worker: "
216+
+ maybe_res.error().message());
217+
does_error_occurs = true;
218+
return false;
219+
}
220+
221+
return maybe_res.value();
222+
});
197223

198224
if (does_error_occurs) {
199225
return SchedulerErrors::kCannotSelectWorker;
200226
}
201227

202-
WorkerID wid = acceptable[0];
228+
WorkerId wid = acceptable[0];
203229

204230
assignWorker(wid, workers_[wid], request);
205231

@@ -214,7 +240,7 @@ namespace fc::sector_storage {
214240
}
215241

216242
void SchedulerImpl::assignWorker(
217-
WorkerID wid,
243+
WorkerId wid,
218244
const std::shared_ptr<WorkerHandle> &worker,
219245
const std::shared_ptr<TaskRequest> &request) {
220246
worker->preparing.add(worker->info.resources, request->need_resources);
@@ -242,8 +268,18 @@ namespace fc::sector_storage {
242268
+ maybe_call_id.error().message());
243269
return clear();
244270
}
245-
ReturnCb new_cb = [request, clear = std::move(clear)](
271+
estimator_->startWork(wid, request->task_type, maybe_call_id.value());
272+
ReturnCb new_cb = [call_id{maybe_call_id.value()},
273+
estimator{estimator_},
274+
request,
275+
clear = std::move(clear)](
246276
outcome::result<CallResult> result) -> void {
277+
if (result.has_value()) {
278+
estimator->finishWork(call_id);
279+
} else {
280+
estimator->abortWork(call_id);
281+
}
282+
247283
request->cb(std::move(result));
248284

249285
return clear();
@@ -287,7 +323,7 @@ namespace fc::sector_storage {
287323
});
288324
}
289325

290-
void SchedulerImpl::freeWorker(WorkerID wid) {
326+
void SchedulerImpl::freeWorker(WorkerId wid) {
291327
std::shared_ptr<WorkerHandle> worker;
292328
{
293329
std::lock_guard<std::mutex> lock(workers_lock_);

core/sector_storage/impl/scheduler_impl.hpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,23 @@
1313
#include <unordered_map>
1414
#include <utility>
1515
#include "primitives/resources/resources.hpp"
16+
#include "sector_storage/worker_estimator.hpp"
1617
#include "storage/buffer_map.hpp"
1718

1819
namespace fc::sector_storage {
19-
using storage::BufferMap;
20-
using WorkerID = uint64_t;
2120
using primitives::Resources;
21+
using storage::BufferMap;
2222

2323
struct TaskRequest {
2424
inline TaskRequest(const SectorRef &sector,
25-
TaskType task_type,
25+
const TaskType &task_type,
2626
uint64_t priority,
2727
std::shared_ptr<WorkerSelector> sel,
2828
WorkerAction prepare,
2929
WorkerAction work,
3030
ReturnCb cb)
3131
: sector(sector),
32-
task_type(std::move(task_type)),
32+
task_type(task_type),
3333
priority(priority),
3434
sel(std::move(sel)),
3535
prepare(std::move(prepare)),
@@ -65,7 +65,8 @@ namespace fc::sector_storage {
6565
public:
6666
static outcome::result<std::shared_ptr<SchedulerImpl>> newScheduler(
6767
std::shared_ptr<boost::asio::io_context> io_context,
68-
std::shared_ptr<BufferMap> datastore);
68+
std::shared_ptr<BufferMap> datastore,
69+
std::shared_ptr<Estimator> estimator);
6970

7071
outcome::result<void> schedule(
7172
const SectorRef &sector,
@@ -84,22 +85,25 @@ namespace fc::sector_storage {
8485

8586
private:
8687
explicit SchedulerImpl(std::shared_ptr<boost::asio::io_context> io_context,
87-
std::shared_ptr<BufferMap> datastore);
88+
std::shared_ptr<BufferMap> datastore,
89+
std::shared_ptr<Estimator> estimator);
8890

8991
outcome::result<void> resetWorks();
9092

9193
outcome::result<bool> maybeScheduleRequest(
9294
const std::shared_ptr<TaskRequest> &request);
9395

94-
void assignWorker(WorkerID wid,
96+
void assignWorker(WorkerId wid,
9597
const std::shared_ptr<WorkerHandle> &worker,
9698
const std::shared_ptr<TaskRequest> &request);
9799

98-
void freeWorker(WorkerID wid);
100+
void freeWorker(WorkerId wid);
99101

100102
std::mutex workers_lock_;
101-
WorkerID current_worker_id_;
102-
std::unordered_map<WorkerID, std::shared_ptr<WorkerHandle>> workers_;
103+
WorkerId current_worker_id_;
104+
std::unordered_map<WorkerId, std::shared_ptr<WorkerHandle>> workers_;
105+
106+
std::shared_ptr<Estimator> estimator_;
103107

104108
std::mutex cbs_lock_;
105109
std::map<CallId, ReturnCb> callbacks_;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "sector_storage/impl/worker_estimator_impl.hpp"
7+
8+
namespace fc::sector_storage {
9+
10+
EstimatorImpl::EstimatorImpl(uint64_t window_size)
11+
: window_size_(window_size) {}
12+
13+
void EstimatorImpl::startWork(WorkerId worker_id,
14+
TaskType type,
15+
CallId call_id) {
16+
const auto start = std::chrono::steady_clock::now();
17+
std::lock_guard locker(mutex_);
18+
active_works_[call_id] = ActiveWork{type, worker_id, start};
19+
}
20+
21+
void EstimatorImpl::finishWork(CallId call_id) {
22+
const auto finish = std::chrono::steady_clock::now();
23+
24+
std::lock_guard locker(mutex_);
25+
const auto it = active_works_.find(call_id);
26+
if (it == active_works_.end()) return;
27+
const auto &work{it->second};
28+
29+
auto &task_map{workers_data_[work.worker]};
30+
auto it2 = task_map.find(work.type);
31+
if (it2 == task_map.end()) {
32+
std::tie(it2, std::ignore) =
33+
task_map.try_emplace(work.type, CallsData(window_size_));
34+
}
35+
it2->second.addData(std::chrono::duration_cast<std::chrono::milliseconds>(
36+
finish - work.start)
37+
.count());
38+
39+
active_works_.erase(it);
40+
}
41+
42+
void EstimatorImpl::abortWork(CallId call_id) {
43+
std::lock_guard locker(mutex_);
44+
const auto it = active_works_.find(call_id);
45+
if (it == active_works_.end()) return;
46+
47+
// we also can calculate how much works were aborted. It can be not worker's
48+
// guilt, but if we want to have this statistic we can.
49+
50+
active_works_.erase(it);
51+
}
52+
53+
boost::optional<double> EstimatorImpl::getTime(WorkerId id,
54+
TaskType type) const {
55+
std::shared_lock locker(mutex_);
56+
57+
const auto tasks_it = workers_data_.find(id);
58+
if (tasks_it == workers_data_.end()) {
59+
return boost::none;
60+
}
61+
62+
const auto it = tasks_it->second.find(type);
63+
if (it == tasks_it->second.end()) {
64+
return boost::none;
65+
}
66+
67+
return it->second.getAverage();
68+
}
69+
} // namespace fc::sector_storage

0 commit comments

Comments
 (0)