Skip to content

Commit b0bde74

Browse files
Add DataTransfer protocol (#144)
Signed-off-by: Alexey-N-Chernyshov <[email protected]>
1 parent f371b9c commit b0bde74

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2374
-11
lines changed

cmake/Hunter/config.cmake

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ hunter_config(
3636
)
3737

3838
hunter_config(libp2p
39-
URL https://github.com/soramitsu/libp2p/archive/bf2aae97a9a00fe3493ee05d2c679e2aed790e28.zip
40-
SHA1 e051b41120a67f0fb28f3af2cc357e34ad33de2b
41-
CMAKE_ARGS TESTING=OFF
39+
URL https://github.com/soramitsu/libp2p/archive/d65b00317c0c190f6f1870fce48284e6709994f0.zip
40+
SHA1 4b2645f33a0f406c51d42b37a6521e946ffae2ef
41+
CMAKE_ARGS TESTING=OFF EXPOSE_MOCKS=ON
4242
)

core/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ add_subdirectory(clock)
1010
add_subdirectory(codec)
1111
add_subdirectory(common)
1212
add_subdirectory(crypto)
13+
add_subdirectory(data_transfer)
1314
add_subdirectory(primitives)
1415
add_subdirectory(proofs)
1516
add_subdirectory(sectorbuilder)

core/adt/uvarint_key.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ namespace fc::adt {
2626

2727
outcome::result<UvarintKeyer::Key> UvarintKeyer::decode(
2828
const std::string &key) {
29-
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
3029
auto maybe = UVarint::create(gsl::make_span(
31-
reinterpret_cast<const uint8_t *>(key.data()), key.size()));
30+
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
31+
reinterpret_cast<const uint8_t *>(key.data()),
32+
key.size()));
3233
if (!maybe) {
3334
return UvarintKeyError::DECODE_ERROR;
3435
}

core/crypto/bls/impl/bls_provider_impl.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ namespace fc::crypto::bls {
9191
outcome::result<Signature> BlsProviderImpl::aggregateSignatures(
9292
const std::vector<Signature> &signatures) const {
9393
Signature signature;
94+
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
9495
const uint8_t *flat_bytes =
9596
reinterpret_cast<const uint8_t *>(signatures.data());
9697
size_t flat_size = sizeof(Signature) * signatures.size();

core/data_transfer/CMakeLists.txt

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
add_subdirectory(impl)
7+
8+
add_library(data_transfer
9+
message_receiver.cpp
10+
message.cpp
11+
)
12+
target_link_libraries(data_transfer
13+
p2p::p2p
14+
logger
15+
ipld_node
16+
)

core/data_transfer/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Module to perform data transfer over graphsync.
2+
3+
This module encapsulates protocols for exchanging `Piece` data between storage clients and miners, both when consummating
4+
a storage deal and when retrieving the piece later.
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
add_subdirectory(graphsync)
7+
8+
add_library(libp2p_data_transfer_network
9+
libp2p_data_transfer_network.cpp
10+
stream_message_sender.cpp
11+
)
12+
target_link_libraries(libp2p_data_transfer_network
13+
data_transfer
14+
p2p::p2p
15+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
add_library(data_transfer_graphsync
7+
graphsync_manager.cpp
8+
graphsync_receiver.cpp
9+
data_transfer_extension.cpp
10+
)
11+
target_link_libraries(data_transfer_graphsync
12+
clock
13+
graphsync
14+
libp2p_data_transfer_network
15+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "data_transfer_extension.hpp"
7+
#include "codec/cbor/cbor.hpp"
8+
9+
namespace fc::data_transfer {
10+
11+
using libp2p::peer::PeerId;
12+
13+
PeerId ExtensionDataTransferData::getInitiator() const {
14+
return PeerId::fromBase58(initiator).value();
15+
}
16+
17+
outcome::result<Extension> encodeDataTransferExtension(
18+
const ExtensionDataTransferData &data) {
19+
OUTCOME_TRY(bytes, codec::cbor::encode(data));
20+
return Extension{.name = std::string(kDataTransferExtensionName),
21+
.data = bytes.toVector()};
22+
}
23+
24+
/// Decodes Data Transfer graphsync extension
25+
outcome::result<ExtensionDataTransferData> decodeDataTransferExtension(
26+
const Extension &extension) {
27+
if (extension.name != kDataTransferExtensionName) {
28+
return DataTransferExtensionError::UNEXPECTED_EXTENSION_NAME;
29+
}
30+
return codec::cbor::decode<ExtensionDataTransferData>(extension.data);
31+
}
32+
33+
} // namespace fc::data_transfer
34+
35+
OUTCOME_CPP_DEFINE_CATEGORY(fc::data_transfer, DataTransferExtensionError, e) {
36+
using fc::data_transfer::DataTransferExtensionError;
37+
38+
switch (e) {
39+
case DataTransferExtensionError::UNEXPECTED_EXTENSION_NAME:
40+
return "DataTransferExtensionError: unexpected extension name";
41+
}
42+
43+
return "unknown error";
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#ifndef CPP_FILECOIN_DATA_TRANSFER_EXTENSION_HPP
7+
#define CPP_FILECOIN_DATA_TRANSFER_EXTENSION_HPP
8+
9+
#include <libp2p/peer/peer_id.hpp>
10+
#include "data_transfer/types.hpp"
11+
#include "storage/ipfs/graphsync/extension.hpp"
12+
13+
namespace fc::data_transfer {
14+
15+
using libp2p::peer::PeerId;
16+
using storage::ipfs::graphsync::Extension;
17+
18+
constexpr std::string_view kDataTransferExtensionName = "fil/data-transfer";
19+
20+
/**
21+
* Data Transfer extension for graphsync protocol
22+
*/
23+
struct ExtensionDataTransferData {
24+
TransferId transfer_id;
25+
std::string initiator;
26+
bool is_pull;
27+
28+
PeerId getInitiator() const;
29+
};
30+
31+
CBOR_TUPLE(ExtensionDataTransferData, transfer_id, initiator, is_pull)
32+
33+
/// Encodes Data Transfer graphsync extension
34+
outcome::result<Extension> encodeDataTransferExtension(
35+
const ExtensionDataTransferData &data);
36+
37+
/// Decodes Data Transfer graphsync extension
38+
outcome::result<ExtensionDataTransferData> decodeDataTransferExtension(
39+
const Extension &extension);
40+
41+
/**
42+
* @brief Type of errors returned by ExtensionDataTransferData
43+
*/
44+
enum class DataTransferExtensionError { UNEXPECTED_EXTENSION_NAME };
45+
46+
} // namespace fc::data_transfer
47+
48+
OUTCOME_HPP_DECLARE_ERROR(fc::data_transfer, DataTransferExtensionError);
49+
50+
#endif // CPP_FILECOIN_DATA_TRANSFER_EXTENSION_HPP
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "data_transfer/impl/graphsync/graphsync_manager.hpp"
7+
8+
#include <utility>
9+
#include "data_transfer/message.hpp"
10+
11+
namespace fc::data_transfer::graphsync {
12+
13+
using common::Buffer;
14+
15+
GraphSyncManager::GraphSyncManager(std::shared_ptr<Host> host,
16+
PeerId peer)
17+
: network_(std::move(host)), peer_(std::move(peer)) {}
18+
19+
outcome::result<ChannelId> GraphSyncManager::openPushDataChannel(
20+
const PeerId &to,
21+
const Voucher &voucher,
22+
CID base_cid,
23+
std::shared_ptr<IPLDNode> selector) {
24+
OUTCOME_TRY(transfer_id,
25+
sendDtRequest(selector, false, voucher, base_cid, to));
26+
// initiator = us, sender = us, receiver = them
27+
OUTCOME_TRY(
28+
channel_id,
29+
createChannel(
30+
transfer_id, base_cid, selector, voucher.bytes, peer_, peer_, to));
31+
return std::move(channel_id);
32+
}
33+
34+
outcome::result<ChannelId> GraphSyncManager::openPullDataChannel(
35+
const PeerId &to,
36+
const Voucher &voucher,
37+
CID base_cid,
38+
std::shared_ptr<IPLDNode> selector) {
39+
OUTCOME_TRY(transfer_id,
40+
sendDtRequest(selector, true, voucher, base_cid, to));
41+
// initiator = us, sender = them, receiver = us
42+
OUTCOME_TRY(
43+
channel_id,
44+
createChannel(
45+
transfer_id, base_cid, selector, voucher.bytes, peer_, to, peer_));
46+
return std::move(channel_id);
47+
}
48+
49+
outcome::result<ChannelId> GraphSyncManager::createChannel(
50+
const TransferId &transfer_id,
51+
const CID &base_cid,
52+
std::shared_ptr<IPLDNode> selector,
53+
const std::vector<uint8_t> &voucher,
54+
const PeerId &initiator,
55+
const PeerId &sender_peer,
56+
const PeerId &receiver_peer) {
57+
ChannelId channel_id{.initiator = initiator, .id = transfer_id};
58+
Channel channel{.transfer_id = 0,
59+
.base_cid = base_cid,
60+
.selector = std::move(selector),
61+
.voucher = voucher,
62+
.sender = sender_peer,
63+
.recipient = receiver_peer,
64+
.total_size = 0};
65+
ChannelState state{.channel = channel, .sent = 0, .received = 0};
66+
67+
// TODO check thread-safety of channels_
68+
auto res = channels_.try_emplace(channel_id, state);
69+
if (!res.second) return GraphsyncManagerError::STATE_ALREADY_EXISTS;
70+
71+
return channel_id;
72+
}
73+
74+
outcome::result<void> GraphSyncManager::closeChannel(
75+
const ChannelId &channel_id) {
76+
return outcome::success();
77+
}
78+
79+
outcome::result<TransferId> GraphSyncManager::sendDtRequest(
80+
const std::shared_ptr<IPLDNode> &selector,
81+
bool is_pull,
82+
const Voucher &voucher,
83+
const CID &base_cid,
84+
const PeerId &to) {
85+
std::vector<uint8_t> selector_bytes = selector->getRawBytes().toVector();
86+
87+
TransferId tx_id = ++last_tx_id;
88+
OUTCOME_TRY(base_cid_str, base_cid.toString());
89+
DataTransferMessage message = createRequest(base_cid_str,
90+
is_pull,
91+
selector_bytes,
92+
voucher.bytes,
93+
voucher.type,
94+
tx_id);
95+
OUTCOME_TRY(sender, network_.newMessageSender(to));
96+
OUTCOME_TRY(sender->sendMessage(message));
97+
return tx_id;
98+
}
99+
100+
boost::optional<ChannelState> GraphSyncManager::getChannelByIdAndSender(
101+
const ChannelId &channel_id, const PeerId &sender) {
102+
// TODO check thread-safety of channels_
103+
auto found = channels_.find(channel_id);
104+
if (found == channels_.end() || found->second.channel.sender != sender) {
105+
return boost::none;
106+
}
107+
108+
return found->second;
109+
}
110+
111+
outcome::result<void> GraphSyncManager::sendResponse(bool is_accepted,
112+
const PeerId &to,
113+
TransferId transfer_id) {
114+
DataTransferMessage message = createResponse(is_accepted, transfer_id);
115+
OUTCOME_TRY(sender, network_.newMessageSender(to));
116+
OUTCOME_TRY(sender->sendMessage(message));
117+
return outcome::success();
118+
}
119+
120+
} // namespace fc::data_transfer::graphsync
121+
122+
OUTCOME_CPP_DEFINE_CATEGORY(fc::data_transfer::graphsync,
123+
GraphsyncManagerError,
124+
e) {
125+
using fc::data_transfer::graphsync::GraphsyncManagerError;
126+
127+
switch (e) {
128+
case GraphsyncManagerError::STATE_ALREADY_EXISTS:
129+
return "GraphsyncManagerError: state already exists";
130+
}
131+
132+
return "unknown error";
133+
}

0 commit comments

Comments
 (0)