Skip to content

Reconnect socket && worker ping #600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
22 changes: 21 additions & 1 deletion core/api/rpc/wsc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace fc::api::rpc {
}));
}
socket.handshake(host, target, ec);
client_data = ClientData{host, port, target, token};
if (ec) {
return ec;
}
Expand Down Expand Up @@ -87,10 +88,11 @@ namespace fc::api::rpc {
}
}
chans.clear();
reconnect(3, std::chrono::seconds(5));
}

void Client::_flush() {
if (!writing && !write_queue.empty()) {
if (!writing && !write_queue.empty() && !reconnecting){
auto &[id, buffer] = write_queue.front();
writing = true;
socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()),
Expand Down Expand Up @@ -185,4 +187,22 @@ namespace fc::api::rpc {
}
}
}

void Client::reconnect(int counter, std::chrono::milliseconds wait) {
if(reconnecting.exchange(true)) return;
logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port);
for(int i = 0; i < counter; i++){
std::this_thread::sleep_for(wait*(i+1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::this_thread::sleep_for(wait*(i+1));
std::this_thread::sleep_for(wait * (i + 1));

auto res = connect(client_data.host,
client_data.port,
client_data.target,
client_data.token);
if(!res.has_error()) {
break;
}
}
reconnecting.store(false);
logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port);
_flush();
}
} // namespace fc::api::rpc
13 changes: 13 additions & 0 deletions core/api/rpc/wsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ namespace fc::api::rpc {
void setup(A &api) {
visit(api, [&](auto &m) { _setup(*this, m); });
}
struct ClientData {
ClientData(std::string host,
std::string port,
std::string target,
std::string token)
: host(host), port(port), target(target), token(token){};
ClientData() = default;

std::string host, port, target, token;
}client_data;

void reconnect(int counter, std::chrono::milliseconds wait);

private:
std::thread thread;
Expand All @@ -72,6 +84,7 @@ namespace fc::api::rpc {
std::map<uint64_t, ChanCb> chans;
std::queue<std::pair<uint64_t, Bytes>> write_queue;
bool writing{false};
std::atomic<bool> reconnecting;

template <typename M>
void _setup(Client &c, M &m);
Expand Down
3 changes: 3 additions & 0 deletions core/sector_storage/impl/local_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,4 +904,7 @@ namespace fc::sector_storage {

return call_id;
}
void LocalWorker::ping(std::function<void(const bool &resp)> cb) {
cb(true);
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/local_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ namespace fc::sector_storage {
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
override;

void ping(std::function<void(const bool &resp)> cb) override;

private:
template <typename W, typename R>
outcome::result<CallId> asyncCall(const SectorRef &sector,
Expand Down
6 changes: 6 additions & 0 deletions core/sector_storage/impl/remote_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,10 @@ namespace fc::sector_storage {
AcquireMode mode) {
return api_.Fetch(sector, file_type, path_type, mode);
}

void RemoteWorker::ping(std::function<void(const bool &resp)> cb) {
api_.Version([=](auto res){
cb(!res.has_error());
});
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/remote_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace fc::sector_storage {
const SectorRef &sector,
const PreCommit1Output &pre_commit_1_output) override;

void ping(std::function<void(const bool &resp)> cb) override;

outcome::result<CallId> sealCommit1(const SectorRef &sector,
const SealRandomness &ticket,
const InteractiveRandomness &seed,
Expand Down
18 changes: 15 additions & 3 deletions core/sector_storage/impl/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,22 @@ namespace fc::sector_storage {
return SchedulerErrors::kCannotSelectWorker;
}

WorkerID wid = acceptable[0];

std::promise<WorkerID> wid_promise;
std::future<WorkerID> wid_future = wid_promise.get_future();
auto done = std::make_shared<std::atomic_bool>();
for (const auto &cur : acceptable) {
workers_[cur]->worker->ping([&wid_promise, done, cur](const bool &resp) {
if (resp && !done->exchange(true)) {
wid_promise.set_value(cur);
}
});
}
auto status = wid_future.wait_for(std::chrono::seconds(5));
if(status == std::future_status::timeout){
return false;
}
WorkerID wid = wid_future.get();
assignWorker(wid, workers_[wid], request);

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions core/sector_storage/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ namespace fc::sector_storage {

virtual outcome::result<std::vector<primitives::StoragePath>>
getAccessiblePaths() = 0;

virtual void ping(std::function<void(const bool &resp)> cb) = 0;
};

enum class CallErrorCode : uint64_t {
Expand Down
1 change: 1 addition & 0 deletions test/testutil/mocks/sector_storage/worker_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ namespace fc::sector_storage {
gsl::span<const UnpaddedPieceSize>,
const UnpaddedPieceSize &,
int));
MOCK_METHOD1(ping, void(std::function<void(const bool &)>));
};
} // namespace fc::sector_storage