diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index 25a6e61f67..8d3c6463a1 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -37,25 +37,26 @@ namespace fc::api::rpc { return connect(ip, port, target, token); } - outcome::result<void> Client::connect(const std::string &host, - const std::string &port, - const std::string &target, - const std::string &token) { + outcome::result<void> Client::connect(const std::string &host, + const std::string &port, + const std::string &target, + const std::string &token) { boost::system::error_code ec; - socket.next_layer().connect({boost::asio::ip::make_address(host), - boost::lexical_cast<uint16_t>(port)}, - ec); + socket->next_layer().connect({boost::asio::ip::make_address(host), + boost::lexical_cast<uint16_t>(port)}, + ec); if (ec) { return ec; } if (not token.empty()) { - socket.set_option( + socket->set_option( boost::beast::websocket::stream_base::decorator([&](auto &req) { req.set(boost::beast::http::field::authorization, "Bearer " + token); })); } - socket.handshake(host, target, ec); + socket->handshake(host, target, ec); + client_data = ClientData{host, port, target, token}; if (ec) { return ec; } @@ -87,27 +88,28 @@ namespace fc::api::rpc { } } chans.clear(); + reconnect(3, std::chrono::seconds(10)); } void Client::_flush() { - if (!writing && !write_queue.empty()) { + if (!writing && !write_queue.empty() && not reconnecting) { auto &[id, buffer] = write_queue.front(); writing = true; - socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()), - [=](auto &&ec, auto) { - std::lock_guard lock{mutex}; - if (ec) { - return _error(ec); - } - writing = false; - write_queue.pop(); - _flush(); - }); + socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()), + [=](auto &&ec, auto) { + std::lock_guard lock{mutex}; + writing = false; + if (ec) { + return _error(ec); + } + write_queue.pop(); + _flush(); + }); } } void Client::_read() { - socket.async_read(buffer, [=](auto &&ec, auto) { + socket->async_read(buffer, [=](auto &&ec, auto) { if (ec) { std::lock_guard lock{mutex}; return _error(ec); @@ -185,4 +187,36 @@ namespace fc::api::rpc { } } } + + void Client::reconnect(int counter, std::chrono::milliseconds wait) { + if (reconnecting) return; + reconnecting = true; + bool rec_status{false}; + logger_->info( + "Starting reconnect to {}:{}", client_data.host, client_data.port); + for (int i = 0; i < counter; i++) { + std::this_thread::sleep_for(wait*(i+1)); + socket.reset(); + socket.emplace(io); + auto res = connect(client_data.host, + client_data.port, + client_data.target, + client_data.token); + if (not res.has_error()) { + rec_status = true; + break; + } + } + reconnecting = false; + if (rec_status) { + logger_->info("Reconnect to {}:{} was successful", + client_data.host, + client_data.port); + _flush(); + } else { + logger_->error("Reconnect to {}:{} have been failed", + client_data.host, + client_data.port); + } + } } // namespace fc::api::rpc diff --git a/core/api/rpc/wsc.hpp b/core/api/rpc/wsc.hpp index 4c60cbf837..32a1882ada 100644 --- a/core/api/rpc/wsc.hpp +++ b/core/api/rpc/wsc.hpp @@ -57,6 +57,14 @@ namespace fc::api::rpc { void setup(A &api) { visit(api, [&](auto &m) { _setup(*this, m); }); } + struct ClientData { + std::string host; + std::string port; + std::string target; + std::string token; + } client_data; + + void reconnect(int counter, std::chrono::milliseconds wait); private: std::thread thread; @@ -64,7 +72,9 @@ namespace fc::api::rpc { io_context io; io_context &io2; boost::asio::executor_work_guard<io_context::executor_type> work_guard; - boost::beast::websocket::stream<boost::asio::ip::tcp::socket> socket; + boost::optional< + boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> + socket; boost::beast::flat_buffer buffer; std::mutex mutex; uint64_t next_req{}; @@ -72,6 +82,7 @@ namespace fc::api::rpc { std::map<uint64_t, ChanCb> chans; std::queue<std::pair<uint64_t, Bytes>> write_queue; bool writing{false}; + bool reconnecting{false}; template <typename M> void _setup(Client &c, M &m); diff --git a/core/api/worker_api.hpp b/core/api/worker_api.hpp index 74b2e9b43a..17b6d4a418 100644 --- a/core/api/worker_api.hpp +++ b/core/api/worker_api.hpp @@ -126,7 +126,7 @@ namespace fc::api { const SealRandomness &, const CID &) - API_METHOD(Version, kAdminPermission, VersionResult) + API_METHOD(Version, kAdminPermission, ApiVersion) }; template <typename A, typename F> diff --git a/core/primitives/sector_file/sector_file.hpp b/core/primitives/sector_file/sector_file.hpp index 8115f9f344..452ec74a29 100644 --- a/core/primitives/sector_file/sector_file.hpp +++ b/core/primitives/sector_file/sector_file.hpp @@ -83,11 +83,11 @@ namespace fc::primitives::sector_file { struct SectorPaths { public: SectorId id; - std::string unsealed; - std::string sealed; - std::string cache; - std::string update; - std::string update_cache; + std::string unsealed{}; + std::string sealed{}; + std::string cache{}; + std::string update{}; + std::string update_cache{}; void setPathByType(const SectorFileType &file_type, const std::string &path); diff --git a/core/primitives/types.hpp b/core/primitives/types.hpp index b3e9455949..6c190d6486 100644 --- a/core/primitives/types.hpp +++ b/core/primitives/types.hpp @@ -39,7 +39,7 @@ namespace fc::primitives { struct FsStat { uint64_t capacity = 0; uint64_t available = 0; - uint64_t fs_available = 0; // Available to use for sector storage + uint64_t fs_available = 0; // Available to use for sector storage uint64_t reserved = 0; uint64_t max = 0; uint64_t used = 0; @@ -91,6 +91,14 @@ namespace fc::primitives { std::vector<std::string> gpus; }; + inline bool operator==(const WorkerResources &lhs, + const WorkerResources &rhs) { + return (lhs.physical_memory == rhs.physical_memory + && lhs.swap_memory == rhs.swap_memory + && lhs.reserved_memory == rhs.reserved_memory + && lhs.cpus == rhs.cpus && lhs.gpus == rhs.gpus); + } + struct WorkerInfo { std::string hostname; WorkerResources resources; diff --git a/core/remote_worker/remote_worker_api.cpp b/core/remote_worker/remote_worker_api.cpp index b1ce76e314..14a59a8e28 100644 --- a/core/remote_worker/remote_worker_api.cpp +++ b/core/remote_worker/remote_worker_api.cpp @@ -6,6 +6,7 @@ #include "remote_worker/remote_worker_api.hpp" namespace fc::remote_worker { + using api::ApiVersion; using api::VersionResult; using primitives::piece::PieceInfo; using primitives::piece::UnpaddedByteIndex; @@ -26,7 +27,7 @@ namespace fc::remote_worker { const std::shared_ptr<LocalStore> &local_store, const std::shared_ptr<LocalWorker> &worker) { auto worker_api{std::make_shared<api::WorkerApi>()}; - worker_api->Version = []() { return VersionResult{"seal-worker", 0, 0}; }; + worker_api->Version = []() { return ApiVersion{0}; }; worker_api->StorageAddLocal = [=](const std::string &path) { return local_store->openPath(path); }; diff --git a/core/sector_storage/impl/local_worker.cpp b/core/sector_storage/impl/local_worker.cpp index ffad76ff43..ba46d406c8 100644 --- a/core/sector_storage/impl/local_worker.cpp +++ b/core/sector_storage/impl/local_worker.cpp @@ -908,4 +908,7 @@ namespace fc::sector_storage { return call_id; } + void LocalWorker::ping(std::function<void(const bool resp)> cb) { + cb(true); + } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/local_worker.hpp b/core/sector_storage/impl/local_worker.hpp index b17eb71782..99467c124b 100644 --- a/core/sector_storage/impl/local_worker.hpp +++ b/core/sector_storage/impl/local_worker.hpp @@ -97,6 +97,8 @@ namespace fc::sector_storage { outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths() override; + void ping(std::function<void(const bool resp)> cb) override; + private: template <typename W, typename R> outcome::result<CallId> asyncCall(const SectorRef §or, diff --git a/core/sector_storage/impl/remote_worker.cpp b/core/sector_storage/impl/remote_worker.cpp index 4430ca7230..34b9625d87 100644 --- a/core/sector_storage/impl/remote_worker.cpp +++ b/core/sector_storage/impl/remote_worker.cpp @@ -249,4 +249,8 @@ namespace fc::sector_storage { AcquireMode mode) { return api_.Fetch(sector, file_type, path_type, mode); } + + void RemoteWorker::ping(std::function<void(const bool resp)> cb) { + api_.Version([=](auto res){cb(res.has_value());}); + } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/remote_worker.hpp b/core/sector_storage/impl/remote_worker.hpp index c93fe2e6d5..731dbcb936 100644 --- a/core/sector_storage/impl/remote_worker.hpp +++ b/core/sector_storage/impl/remote_worker.hpp @@ -38,6 +38,8 @@ namespace fc::sector_storage { const SectorRef §or, const PreCommit1Output &pre_commit_1_output) override; + void ping(std::function<void(const bool resp)> cb) override; + outcome::result<CallId> sealCommit1(const SectorRef §or, const SealRandomness &ticket, const InteractiveRandomness &seed, diff --git a/core/sector_storage/impl/scheduler_impl.cpp b/core/sector_storage/impl/scheduler_impl.cpp index 63337fbfce..764397b5e1 100644 --- a/core/sector_storage/impl/scheduler_impl.cpp +++ b/core/sector_storage/impl/scheduler_impl.cpp @@ -137,6 +137,11 @@ namespace fc::sector_storage { void SchedulerImpl::newWorker(std::unique_ptr<WorkerHandle> worker) { std::unique_lock<std::mutex> lock(workers_lock_); + for(const auto &[key, value] : workers_){ + if(*value == *worker){ + return; + } + } if (current_worker_id_ == std::numeric_limits<uint64_t>::max()) { current_worker_id_ = 0; // TODO(ortyomka): maybe better mechanism } @@ -196,8 +201,21 @@ namespace fc::sector_storage { return SchedulerErrors::kCannotSelectWorker; } - WorkerID wid = acceptable[0]; - + std::promise<WorkerID> wid_promise; + std::future<WorkerID> wid_future = wid_promise.get_future(); + auto done = std::make_shared<std::atomic_bool>(); + for (const auto &cur : acceptable) { + workers_[cur]->worker->ping([&wid_promise, done, cur](const bool resp) { + if (resp && !done->exchange(true)) { + wid_promise.set_value(cur); + } + }); + } + auto status = wid_future.wait_for(std::chrono::seconds(5)); + if (status == std::future_status::timeout) { + return false; + } + WorkerID wid = wid_future.get(); assignWorker(wid, workers_[wid], request); return true; diff --git a/core/sector_storage/selector.hpp b/core/sector_storage/selector.hpp index 1548db0e9a..cb7ef1880e 100644 --- a/core/sector_storage/selector.hpp +++ b/core/sector_storage/selector.hpp @@ -24,6 +24,11 @@ namespace fc::sector_storage { ActiveResources active; }; + inline bool operator==(const WorkerHandle &lhs, const WorkerHandle &rhs) { + return lhs.info.hostname == rhs.info.hostname + && lhs.info.resources == rhs.info.resources; + } + class WorkerSelector { public: virtual ~WorkerSelector() = default; diff --git a/core/sector_storage/worker.hpp b/core/sector_storage/worker.hpp index e9d8b00b4b..af7b3a55b2 100644 --- a/core/sector_storage/worker.hpp +++ b/core/sector_storage/worker.hpp @@ -142,6 +142,8 @@ namespace fc::sector_storage { virtual outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths() = 0; + + virtual void ping(std::function<void(const bool resp)> cb) = 0; }; enum class CallErrorCode : uint64_t { diff --git a/test/core/sector_storage/scheduler_test.cpp b/test/core/sector_storage/scheduler_test.cpp index f97895edca..fb9fdfc489 100644 --- a/test/core/sector_storage/scheduler_test.cpp +++ b/test/core/sector_storage/scheduler_test.cpp @@ -12,6 +12,7 @@ #include "storage/in_memory/in_memory_storage.hpp" #include "testutil/mocks/sector_storage/selector_mock.hpp" #include "testutil/outcome.hpp" +#include "testutil/mocks/sector_storage/worker_mock.hpp" namespace fc::sector_storage { using primitives::WorkerInfo; @@ -70,9 +71,12 @@ namespace fc::sector_storage { EXPECT_OUTCOME_TRUE(scheduler, SchedulerImpl::newScheduler(io_, kv_)); scheduler_ = scheduler; - + auto worker_test = std::make_shared<WorkerMock>(); + EXPECT_CALL(*worker_test, ping(_)).WillRepeatedly(testing::Invoke([](auto cb){ + cb(true); + })); std::unique_ptr<WorkerHandle> worker = std::make_unique<WorkerHandle>(); - + worker->worker = std::move(worker_test); worker_name_ = "worker"; worker->info = WorkerInfo{ diff --git a/test/testutil/mocks/sector_storage/worker_mock.hpp b/test/testutil/mocks/sector_storage/worker_mock.hpp index 05da3eda38..7a5f0e9c4b 100644 --- a/test/testutil/mocks/sector_storage/worker_mock.hpp +++ b/test/testutil/mocks/sector_storage/worker_mock.hpp @@ -100,5 +100,6 @@ namespace fc::sector_storage { const std::vector<UnpaddedPieceSize> &, const UnpaddedPieceSize &, int)); + MOCK_METHOD1(ping, void(std::function<void(const bool)>)); }; } // namespace fc::sector_storage