Skip to content

Commit a710b9a

Browse files
authored
fetch msg (#552)
Signed-off-by: turuslan <[email protected]>
1 parent 175f992 commit a710b9a

File tree

9 files changed

+233
-26
lines changed

9 files changed

+233
-26
lines changed

core/node/blocksync_request.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "common/logger.hpp"
1414
#include "primitives/tipset/load.hpp"
1515
#include "storage/ipfs/datastore.hpp"
16+
#include "vm/actor/builtin/types/miner/policy.hpp"
1617

1718
#define TRACE_ENABLED 0
1819

@@ -22,7 +23,8 @@ namespace fc::sync::blocksync {
2223
using libp2p::basic::Scheduler;
2324

2425
namespace {
25-
constexpr size_t kMaxDepth = 100;
26+
constexpr size_t kMaxDepth =
27+
vm::actor::builtin::types::miner::kChainFinality;
2628

2729
using primitives::block::MsgMeta;
2830

@@ -32,7 +34,7 @@ namespace fc::sync::blocksync {
3234
}
3335

3436
template <typename... Args>
35-
inline void trace(spdlog::string_view_t fmt, const Args &... args) {
37+
inline void trace(spdlog::string_view_t fmt, const Args &...args) {
3638
#if TRACE_ENABLED
3739
log()->trace(fmt, args...);
3840
#endif
@@ -214,8 +216,8 @@ namespace fc::sync::blocksync {
214216
if (meta_cid != header.messages) {
215217
return BlocksyncRequest::Error::kStoreCidsMismatch;
216218
}
217-
block_stored(std::move(*asBlake(block_cid)), std::move(header));
218219
}
220+
block_stored(std::move(*asBlake(block_cid)), std::move(header));
219221
}
220222

221223
return outcome::success();
@@ -246,8 +248,10 @@ namespace fc::sync::blocksync {
246248
std::function<void(Result)> callback) {
247249
callback_ = std::move(callback);
248250
result_.emplace();
251+
result_->from = peer;
249252
result_->blocks_requested = std::move(blocks);
250253
result_->messages_stored = (options & kMessagesOnly);
254+
result_->messages_only = options == kMessagesOnly;
251255

252256
std::vector<CbCid> blocks_reduced =
253257
tryReduceRequest(result_->blocks_requested,
@@ -260,11 +264,16 @@ namespace fc::sync::blocksync {
260264
return;
261265
}
262266

263-
if (options == kMessagesOnly) {
264-
// not supported yet
265-
result_->error = BlocksyncRequest::Error::kNotImplemented;
266-
scheduleResult();
267-
return;
267+
if (result_->messages_only) {
268+
auto cb{[&](std::error_code ec) {
269+
result_->error = ec;
270+
scheduleResult();
271+
}};
272+
result_->blocks_available.clear();
273+
for (const auto &cid : blocks_reduced) {
274+
OUTCOME_CB(result_->blocks_available.emplace_back(),
275+
getCbor<BlockHeader>(ipld_, CID{cid}));
276+
}
268277
}
269278

270279
waitlist_.insert(blocks_reduced.begin(), blocks_reduced.end());
@@ -285,7 +294,6 @@ namespace fc::sync::blocksync {
285294
}
286295

287296
options_ = options;
288-
result_->from = peer;
289297

290298
host_.newStream(
291299
// peer must be already connected
@@ -462,6 +470,9 @@ namespace fc::sync::blocksync {
462470

463471
boost::optional<TipsetHash> expected_parent;
464472

473+
if (result_->messages_only) {
474+
chain[0].blocks = std::move(result_->blocks_available);
475+
}
465476
auto res = storeTipsetBundle(
466477
ipld_,
467478
put_block_header_,

core/node/blocksync_request.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ namespace fc::sync::blocksync {
4848

4949
/// All their meta/messages is also available
5050
bool messages_stored = false;
51+
52+
bool messages_only{false};
5153
};
5254

5355
/// Cancels existing request if still active and makes the new one

core/node/fetch_msg.hpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include <unordered_set>
9+
10+
#include "node/blocksync_request.hpp"
11+
#include "node/peer_height.hpp"
12+
13+
namespace fc::sync::fetch_msg {
14+
using blocksync::BlocksyncRequest;
15+
using libp2p::Host;
16+
using libp2p::basic::Scheduler;
17+
using peer_height::PeerHeight;
18+
using primitives::ChainEpoch;
19+
20+
constexpr size_t kQueueMax{1000};
21+
constexpr size_t kFetchingMax{20};
22+
23+
struct FetchMsg {
24+
std::shared_ptr<Host> host;
25+
std::shared_ptr<Scheduler> scheduler;
26+
std::shared_ptr<PeerHeight> peers;
27+
IpldPtr ipld;
28+
std::set<std::pair<ChainEpoch, TipsetKey>> queue;
29+
std::unordered_map<TipsetKey, std::shared_ptr<BlocksyncRequest>>
30+
fetching_tsk;
31+
std::unordered_set<PeerId> fetching_peer;
32+
std::function<void(TipsetKey)> on_fetch;
33+
34+
FetchMsg(std::shared_ptr<Host> host,
35+
std::shared_ptr<Scheduler> scheduler,
36+
std::shared_ptr<PeerHeight> peers,
37+
IpldPtr ipld)
38+
: host{host}, scheduler{scheduler}, peers{peers}, ipld{ipld} {}
39+
40+
bool has(const TipsetCPtr &ts, bool priority) {
41+
for (const auto &block : ts->blks) {
42+
if (!ipld->contains(block.messages).value()) {
43+
if (queue.size() >= kQueueMax) {
44+
queue.erase(std::prev(queue.end()));
45+
}
46+
queue.emplace(priority ? -1 : ts->height(), ts->key);
47+
dequeue();
48+
return false;
49+
}
50+
}
51+
return true;
52+
}
53+
54+
void dequeue() {
55+
if (queue.empty()) {
56+
return;
57+
}
58+
if (fetching_tsk.size() >= kFetchingMax) {
59+
return;
60+
}
61+
auto it{queue.begin()};
62+
// note: BlocksyncRequests caches duplicate requests
63+
peers->visit(it->first, [&](const PeerId &peer) {
64+
if (fetching_peer.count(peer) != 0) {
65+
return true;
66+
}
67+
auto tsk{std::move(it->second)};
68+
queue.erase(it);
69+
fetching_peer.emplace(peer);
70+
fetching_tsk.emplace(
71+
tsk,
72+
BlocksyncRequest::newRequest(
73+
*host,
74+
*scheduler,
75+
ipld,
76+
nullptr,
77+
peer,
78+
tsk.cids(),
79+
1,
80+
blocksync::kMessagesOnly,
81+
10000,
82+
[this](BlocksyncRequest::Result r) { onFetch(std::move(r)); }));
83+
return false;
84+
});
85+
}
86+
87+
void onFetch(BlocksyncRequest::Result &&r) {
88+
TipsetKey tsk{std::move(r.blocks_requested)};
89+
fetching_tsk.erase(tsk);
90+
fetching_peer.erase(*r.from);
91+
if (r.error) {
92+
peers->onError(*r.from);
93+
}
94+
dequeue();
95+
if (on_fetch) {
96+
on_fetch(std::move(tsk));
97+
}
98+
}
99+
};
100+
} // namespace fc::sync::fetch_msg

core/node/identify.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ namespace fc::sync {
5757
}
5858
});
5959

60+
on_disconnect_ =
61+
host_->getBus()
62+
.getChannel<libp2p::event::network::OnPeerDisconnectedChannel>()
63+
.subscribe([this](const PeerId &peer) {
64+
events_->signalPeerDisconnected({peer});
65+
});
66+
6067
handleProtocol(*host_, *identify_protocol_);
6168
handleProtocol(*host_, *identify_push_protocol_);
6269
handleProtocol(*host_, *identify_delta_protocol_);

core/node/identify.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "node/events_fwd.hpp"
99

10+
#include <libp2p/event/bus.hpp>
1011
#include <libp2p/peer/peer_info.hpp>
1112

1213
namespace fc::sync {
@@ -34,6 +35,7 @@ namespace fc::sync {
3435

3536
std::shared_ptr<events::Events> events_;
3637
events::Connection on_identify_;
38+
libp2p::event::Handle on_disconnect_;
3739
};
3840

3941
} // namespace fc::sync

core/node/main/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
#include "api/rpc/make.hpp"
1515
#include "api/rpc/ws.hpp"
1616
#include "common/api_secret.hpp"
17-
#include "common/local_ip.hpp"
1817
#include "common/libp2p/peer/peer_info_helper.hpp"
1918
#include "common/libp2p/soralog.hpp"
19+
#include "common/local_ip.hpp"
2020
#include "common/logger.hpp"
2121
#include "common/prometheus/rpc.hpp"
2222
#include "drand/impl/http.hpp"

core/node/peer_height.hpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include <boost/bimap.hpp>
9+
#include <boost/bimap/multiset_of.hpp>
10+
#include <boost/bimap/unordered_set_of.hpp>
11+
12+
#include "node/events.hpp"
13+
14+
namespace fc::sync::peer_height {
15+
using primitives::ChainEpoch;
16+
17+
struct PeerHeight {
18+
boost::bimap<boost::bimaps::unordered_set_of<PeerId, std::hash<PeerId>>,
19+
boost::bimaps::multiset_of<ChainEpoch>>
20+
peers;
21+
events::Connection on_peer_head;
22+
events::Connection on_disconnect;
23+
24+
PeerHeight(const std::shared_ptr<events::Events> &events) {
25+
on_peer_head =
26+
events->subscribePossibleHead([this](const events::PossibleHead &e) {
27+
if (e.source) {
28+
// note: may require successful fetch
29+
onHeight(*e.source, e.height);
30+
}
31+
});
32+
on_disconnect = events->subscribePeerDisconnected(
33+
[this](const events::PeerDisconnected &e) { onError(e.peer_id); });
34+
}
35+
36+
void onHeight(const PeerId &peer, ChainEpoch height) {
37+
if (height <= 0) {
38+
return;
39+
}
40+
// note: may ignore peers behind our height
41+
const auto it{peers.left.find(peer)};
42+
if (it == peers.left.end()) {
43+
peers.insert({peer, height});
44+
} else if (height > it->second) {
45+
peers.left.replace_data(it, height);
46+
}
47+
}
48+
49+
void onError(const PeerId &peer) {
50+
peers.left.erase(peer);
51+
}
52+
53+
template <typename F>
54+
void visit(ChainEpoch min, const F &f) {
55+
for (auto it{peers.right.rbegin()}; it != peers.right.rend(); ++it) {
56+
if (it->first < min || !f(it->second)) {
57+
break;
58+
}
59+
}
60+
}
61+
};
62+
} // namespace fc::sync::peer_height

0 commit comments

Comments
 (0)