Skip to content

Commit 5c5067a

Browse files
committed
duplicate worker fix
Signed-off-by: elestrias <[email protected]>
1 parent 1e08815 commit 5c5067a

File tree

12 files changed

+39
-24
lines changed

12 files changed

+39
-24
lines changed

core/api/rpc/wsc.hpp

+8-10
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,11 @@ namespace fc::api::rpc {
5858
visit(api, [&](auto &m) { _setup(*this, m); });
5959
}
6060
struct ClientData {
61-
ClientData(std::string host,
62-
std::string port,
63-
std::string target,
64-
std::string token)
65-
: host(host), port(port), target(target), token(token){};
66-
ClientData() = default;
67-
68-
std::string host, port, target, token;
69-
}client_data;
61+
std::string host;
62+
std::string port;
63+
std::string target;
64+
std::string token;
65+
} client_data;
7066

7167
void reconnect(int counter, std::chrono::milliseconds wait);
7268

@@ -76,7 +72,9 @@ namespace fc::api::rpc {
7672
io_context io;
7773
io_context &io2;
7874
boost::asio::executor_work_guard<io_context::executor_type> work_guard;
79-
boost::optional<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> socket;
75+
boost::optional<
76+
boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>
77+
socket;
8078
boost::beast::flat_buffer buffer;
8179
std::mutex mutex;
8280
uint64_t next_req{};

core/primitives/sector_file/sector_file.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ namespace fc::primitives::sector_file {
8383
struct SectorPaths {
8484
public:
8585
SectorId id;
86-
std::string unsealed;
87-
std::string sealed;
88-
std::string cache;
89-
std::string update;
90-
std::string update_cache;
86+
std::string unsealed{};
87+
std::string sealed{};
88+
std::string cache{};
89+
std::string update{};
90+
std::string update_cache{};
9191

9292
void setPathByType(const SectorFileType &file_type,
9393
const std::string &path);

core/primitives/types.hpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ namespace fc::primitives {
3939
struct FsStat {
4040
uint64_t capacity = 0;
4141
uint64_t available = 0;
42-
uint64_t fs_available = 0; // Available to use for sector storage
42+
uint64_t fs_available = 0; // Available to use for sector storage
4343
uint64_t reserved = 0;
4444
uint64_t max = 0;
4545
uint64_t used = 0;
@@ -91,6 +91,14 @@ namespace fc::primitives {
9191
std::vector<std::string> gpus;
9292
};
9393

94+
inline bool operator==(const WorkerResources &lhs,
95+
const WorkerResources &rhs) {
96+
return (lhs.physical_memory == rhs.physical_memory
97+
&& lhs.swap_memory == rhs.swap_memory
98+
&& lhs.reserved_memory == rhs.reserved_memory
99+
&& lhs.cpus == rhs.cpus && lhs.gpus == rhs.gpus);
100+
}
101+
94102
struct WorkerInfo {
95103
std::string hostname;
96104
WorkerResources resources;

core/sector_storage/impl/local_worker.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ namespace fc::sector_storage {
904904

905905
return call_id;
906906
}
907-
void LocalWorker::ping(std::function<void(bool resp)> cb) {
907+
void LocalWorker::ping(std::function<void(const bool resp)> cb) {
908908
cb(true);
909909
}
910910
} // namespace fc::sector_storage

core/sector_storage/impl/local_worker.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ namespace fc::sector_storage {
9999
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
100100
override;
101101

102-
void ping(std::function<void(bool resp)> cb) override;
102+
void ping(std::function<void(const bool resp)> cb) override;
103103

104104
private:
105105
template <typename W, typename R>

core/sector_storage/impl/manager_impl.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ namespace fc::sector_storage {
260260

261261
worker_handler->worker = std::move(worker);
262262
worker_handler->info = std::move(info);
263-
264263
scheduler_->newWorker(std::move(worker_handler));
265264

266265
return outcome::success();

core/sector_storage/impl/remote_worker.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ namespace fc::sector_storage {
255255
return api_.Fetch(sector, file_type, path_type, mode);
256256
}
257257

258-
void RemoteWorker::ping(std::function<void(bool resp)> cb) {
258+
void RemoteWorker::ping(std::function<void(const bool resp)> cb) {
259259
api_.Version([=](auto res) { cb(!res.has_error()); });
260260
}
261261
} // namespace fc::sector_storage

core/sector_storage/impl/remote_worker.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ namespace fc::sector_storage {
3939
const SectorRef &sector,
4040
const PreCommit1Output &pre_commit_1_output) override;
4141

42-
void ping(std::function<void(bool resp)> cb) override;
42+
void ping(std::function<void(const bool resp)> cb) override;
4343

4444
outcome::result<CallId> sealCommit1(const SectorRef &sector,
4545
const SealRandomness &ticket,

core/sector_storage/impl/scheduler_impl.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ namespace fc::sector_storage {
137137

138138
void SchedulerImpl::newWorker(std::unique_ptr<WorkerHandle> worker) {
139139
std::unique_lock<std::mutex> lock(workers_lock_);
140+
for(const auto &[key, value] : workers_){
141+
if(*value == *worker){
142+
return;
143+
}
144+
}
140145
if (current_worker_id_ == std::numeric_limits<uint64_t>::max()) {
141146
current_worker_id_ = 0; // TODO(ortyomka): maybe better mechanism
142147
}
@@ -200,7 +205,7 @@ namespace fc::sector_storage {
200205
std::future<WorkerID> wid_future = wid_promise.get_future();
201206
auto done = std::make_shared<std::atomic_bool>();
202207
for (const auto &cur : acceptable) {
203-
workers_[cur]->worker->ping([&wid_promise, done, cur](bool resp) {
208+
workers_[cur]->worker->ping([&wid_promise, done, cur](const bool resp) {
204209
if (resp && !done->exchange(true)) {
205210
wid_promise.set_value(cur);
206211
}

core/sector_storage/selector.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ namespace fc::sector_storage {
2424
ActiveResources active;
2525
};
2626

27+
inline bool operator==(const WorkerHandle &lhs, const WorkerHandle &rhs) {
28+
return lhs.info.hostname == rhs.info.hostname
29+
&& lhs.info.resources == rhs.info.resources;
30+
}
31+
2732
class WorkerSelector {
2833
public:
2934
virtual ~WorkerSelector() = default;

core/sector_storage/worker.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ namespace fc::sector_storage {
143143
virtual outcome::result<std::vector<primitives::StoragePath>>
144144
getAccessiblePaths() = 0;
145145

146-
virtual void ping(std::function<void(bool resp)> cb) = 0;
146+
virtual void ping(std::function<void(const bool resp)> cb) = 0;
147147
};
148148

149149
enum class CallErrorCode : uint64_t {

test/testutil/mocks/sector_storage/worker_mock.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,6 @@ namespace fc::sector_storage {
101101
gsl::span<const UnpaddedPieceSize>,
102102
const UnpaddedPieceSize &,
103103
int));
104-
MOCK_METHOD1(ping, void(std::function<void(const bool &)>));
104+
MOCK_METHOD1(ping, void(std::function<void(const bool)>));
105105
};
106106
} // namespace fc::sector_storage

0 commit comments

Comments
 (0)