-
Notifications
You must be signed in to change notification settings - Fork 36
Reconnect socket && worker ping #600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0ace090
59e534c
16b8cd5
1e08815
5c5067a
566d6d2
07085f0
3b1decb
b52f975
9e79b58
8e0d911
a73b6dc
48facdb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -37,25 +37,26 @@ namespace fc::api::rpc { | |||||
return connect(ip, port, target, token); | ||||||
} | ||||||
|
||||||
outcome::result<void> Client::connect(const std::string &host, | ||||||
const std::string &port, | ||||||
const std::string &target, | ||||||
const std::string &token) { | ||||||
outcome::result<void> Client::connect(const std::string &host, | ||||||
const std::string &port, | ||||||
const std::string &target, | ||||||
const std::string &token) { | ||||||
boost::system::error_code ec; | ||||||
socket.next_layer().connect({boost::asio::ip::make_address(host), | ||||||
boost::lexical_cast<uint16_t>(port)}, | ||||||
ec); | ||||||
socket->next_layer().connect({boost::asio::ip::make_address(host), | ||||||
boost::lexical_cast<uint16_t>(port)}, | ||||||
ec); | ||||||
if (ec) { | ||||||
return ec; | ||||||
} | ||||||
if (not token.empty()) { | ||||||
socket.set_option( | ||||||
socket->set_option( | ||||||
boost::beast::websocket::stream_base::decorator([&](auto &req) { | ||||||
req.set(boost::beast::http::field::authorization, | ||||||
"Bearer " + token); | ||||||
})); | ||||||
} | ||||||
socket.handshake(host, target, ec); | ||||||
socket->handshake(host, target, ec); | ||||||
client_data = ClientData{host, port, target, token}; | ||||||
if (ec) { | ||||||
return ec; | ||||||
} | ||||||
|
@@ -87,27 +88,28 @@ namespace fc::api::rpc { | |||||
} | ||||||
} | ||||||
chans.clear(); | ||||||
reconnect(3, std::chrono::seconds(10)); | ||||||
} | ||||||
|
||||||
void Client::_flush() { | ||||||
if (!writing && !write_queue.empty()) { | ||||||
if (!writing && !write_queue.empty() && not reconnecting) { | ||||||
auto &[id, buffer] = write_queue.front(); | ||||||
writing = true; | ||||||
socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()), | ||||||
[=](auto &&ec, auto) { | ||||||
std::lock_guard lock{mutex}; | ||||||
if (ec) { | ||||||
return _error(ec); | ||||||
} | ||||||
writing = false; | ||||||
write_queue.pop(); | ||||||
_flush(); | ||||||
}); | ||||||
socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()), | ||||||
[=](auto &&ec, auto) { | ||||||
std::lock_guard lock{mutex}; | ||||||
writing = false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moreover, you provide data by copy |
||||||
if (ec) { | ||||||
return _error(ec); | ||||||
} | ||||||
write_queue.pop(); | ||||||
_flush(); | ||||||
}); | ||||||
} | ||||||
} | ||||||
|
||||||
void Client::_read() { | ||||||
socket.async_read(buffer, [=](auto &&ec, auto) { | ||||||
socket->async_read(buffer, [=](auto &&ec, auto) { | ||||||
if (ec) { | ||||||
std::lock_guard lock{mutex}; | ||||||
return _error(ec); | ||||||
|
@@ -185,4 +187,36 @@ namespace fc::api::rpc { | |||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
void Client::reconnect(int counter, std::chrono::milliseconds wait) { | ||||||
if (reconnecting) return; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it thread-safe? |
||||||
reconnecting = true; | ||||||
bool rec_status{false}; | ||||||
logger_->info( | ||||||
"Starting reconnect to {}:{}", client_data.host, client_data.port); | ||||||
for (int i = 0; i < counter; i++) { | ||||||
std::this_thread::sleep_for(wait*(i+1)); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
socket.reset(); | ||||||
socket.emplace(io); | ||||||
auto res = connect(client_data.host, | ||||||
client_data.port, | ||||||
client_data.target, | ||||||
client_data.token); | ||||||
turuslan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if (not res.has_error()) { | ||||||
rec_status = true; | ||||||
break; | ||||||
} | ||||||
} | ||||||
reconnecting = false; | ||||||
if (rec_status) { | ||||||
logger_->info("Reconnect to {}:{} was successful", | ||||||
client_data.host, | ||||||
client_data.port); | ||||||
_flush(); | ||||||
} else { | ||||||
logger_->error("Reconnect to {}:{} have been failed", | ||||||
client_data.host, | ||||||
client_data.port); | ||||||
} | ||||||
} | ||||||
} // namespace fc::api::rpc |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -6,6 +6,7 @@ | |||
#include "remote_worker/remote_worker_api.hpp" | ||||
|
||||
namespace fc::remote_worker { | ||||
using api::ApiVersion; | ||||
using api::VersionResult; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not used anymore, remove
Suggested change
|
||||
using primitives::piece::PieceInfo; | ||||
using primitives::piece::UnpaddedByteIndex; | ||||
|
@@ -26,7 +27,7 @@ namespace fc::remote_worker { | |||
const std::shared_ptr<LocalStore> &local_store, | ||||
const std::shared_ptr<LocalWorker> &worker) { | ||||
auto worker_api{std::make_shared<api::WorkerApi>()}; | ||||
worker_api->Version = []() { return VersionResult{"seal-worker", 0, 0}; }; | ||||
worker_api->Version = []() { return ApiVersion{0}; }; | ||||
worker_api->StorageAddLocal = [=](const std::string &path) { | ||||
return local_store->openPath(path); | ||||
}; | ||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -908,4 +908,7 @@ namespace fc::sector_storage { | |||||||||||
|
||||||||||||
return call_id; | ||||||||||||
} | ||||||||||||
void LocalWorker::ping(std::function<void(const bool resp)> cb) { | ||||||||||||
Comment on lines
910
to
+911
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
cb(true); | ||||||||||||
} | ||||||||||||
} // namespace fc::sector_storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use named variable or const instead of magic number 3