Skip to content

Commit a9634fe

Browse files
Feature/retrieval market v1.0.0 (#549)
Signed-off-by: Alexey Chernyshov <[email protected]>
1 parent d8e5de0 commit a9634fe

14 files changed

+942
-287
lines changed

core/codec/cbor/cbor_types.cpp

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -4,85 +4,9 @@
44
*/
55

66
#include "codec/cbor/cbor_codec.hpp"
7-
#include "markets/retrieval/protocols/retrieval_protocol.hpp"
87
#include "primitives/address/address_codec.hpp"
98
#include "storage/ipfs/graphsync/extension.hpp"
109

11-
namespace fc::markets::retrieval {
12-
using codec::cbor::CborDecodeStream;
13-
using codec::cbor::CborEncodeStream;
14-
15-
CBOR2_ENCODE(DealProposalParams::Named) {
16-
auto m{CborEncodeStream::map()};
17-
m["Selector"] << v.selector;
18-
m["PieceCID"] << v.piece;
19-
m["PricePerByte"] << v.price_per_byte;
20-
m["PaymentInterval"] << v.payment_interval;
21-
m["PaymentIntervalIncrease"] << v.payment_interval_increase;
22-
m["UnsealPrice"] << v.unseal_price;
23-
return s << m;
24-
}
25-
CBOR2_DECODE(DealProposalParams::Named) {
26-
auto m{s.map()};
27-
CborDecodeStream::named(m, "Selector") >> v.selector;
28-
CborDecodeStream::named(m, "PieceCID") >> v.piece;
29-
CborDecodeStream::named(m, "PricePerByte") >> v.price_per_byte;
30-
CborDecodeStream::named(m, "PaymentInterval") >> v.payment_interval;
31-
CborDecodeStream::named(m, "PaymentIntervalIncrease")
32-
>> v.payment_interval_increase;
33-
CborDecodeStream::named(m, "UnsealPrice") >> v.unseal_price;
34-
return s;
35-
}
36-
37-
CBOR2_ENCODE(DealProposal::Named) {
38-
auto m{CborEncodeStream::map()};
39-
m["PayloadCID"] << v.payload_cid;
40-
m["ID"] << v.deal_id;
41-
m["Params"] << static_cast<const DealProposalParams::Named &>(v.params);
42-
return s << m;
43-
}
44-
CBOR2_DECODE(DealProposal::Named) {
45-
auto m{s.map()};
46-
CborDecodeStream::named(m, "PayloadCID") >> v.payload_cid;
47-
CborDecodeStream::named(m, "ID") >> v.deal_id;
48-
CborDecodeStream::named(m, "Params")
49-
>> *(static_cast<DealProposalParams::Named *>(&v.params));
50-
return s;
51-
}
52-
53-
CBOR2_ENCODE(DealResponse::Named) {
54-
auto m{CborEncodeStream::map()};
55-
m["Status"] << v.status;
56-
m["ID"] << v.deal_id;
57-
m["PaymentOwed"] << v.payment_owed;
58-
m["Message"] << v.message;
59-
return s << m;
60-
}
61-
CBOR2_DECODE(DealResponse::Named) {
62-
auto m{s.map()};
63-
CborDecodeStream::named(m, "Status") >> v.status;
64-
CborDecodeStream::named(m, "ID") >> v.deal_id;
65-
CborDecodeStream::named(m, "PaymentOwed") >> v.payment_owed;
66-
CborDecodeStream::named(m, "Message") >> v.message;
67-
return s;
68-
}
69-
70-
CBOR2_ENCODE(DealPayment::Named) {
71-
auto m{CborEncodeStream::map()};
72-
m["ID"] << v.deal_id;
73-
m["PaymentChannel"] << v.payment_channel;
74-
m["PaymentVoucher"] << v.payment_voucher;
75-
return s << m;
76-
}
77-
CBOR2_DECODE(DealPayment::Named) {
78-
auto m{s.map()};
79-
CborDecodeStream::named(m, "ID") >> v.deal_id;
80-
CborDecodeStream::named(m, "PaymentChannel") >> v.payment_channel;
81-
CborDecodeStream::named(m, "PaymentVoucher") >> v.payment_voucher;
82-
return s;
83-
}
84-
} // namespace fc::markets::retrieval
85-
8610
namespace fc::storage::ipfs::graphsync {
8711
using codec::cbor::CborDecodeStream;
8812
using codec::cbor::CborEncodeStream;

core/markets/retrieval/client/impl/retrieval_client_impl.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include "common/libp2p/peer/peer_info_helper.hpp"
1010

1111
namespace fc::markets::retrieval::client {
12-
DealState::DealState(const DealProposal &proposal,
12+
DealState::DealState(const DealProposalV1_0_0 &proposal,
1313
const IpldPtr &ipld,
1414
RetrieveResponseHandler handler,
1515
Address client_wallet,
@@ -45,18 +45,20 @@ namespace fc::markets::retrieval::client {
4545
const QueryResponseHandler &cb) {
4646
OUTCOME_CB(auto peer, getPeerInfo(provider_peer));
4747
host_->newStream(
48-
peer, kQueryProtocolId, [=, self{shared_from_this()}](auto stream_res) {
48+
peer,
49+
kQueryProtocolIdV1_0_0,
50+
[=, self{shared_from_this()}](auto stream_res) {
4951
OUTCOME_CB1(stream_res);
5052
self->logger_->debug("connected to provider ID "
5153
+ peerInfoToPrettyString(peer));
5254
auto stream{std::make_shared<CborStream>(stream_res.value())};
53-
stream->write(request, [=](auto written_res) {
55+
stream->write(QueryRequestV1_0_0{request}, [=](auto written_res) {
5456
if (written_res.has_error()) {
5557
stream->close();
5658
cb(written_res.error());
5759
return;
5860
}
59-
stream->template read<QueryResponse>([=](auto response) {
61+
stream->template read<QueryResponseV1_0_0>([=](auto response) {
6062
stream->close();
6163
cb(std::move(response));
6264
});
@@ -80,17 +82,15 @@ namespace fc::markets::retrieval::client {
8082
const Address &client_wallet,
8183
const Address &miner_wallet,
8284
const RetrieveResponseHandler &handler) {
83-
DealProposal::Named proposal{{.payload_cid = payload_cid,
84-
.deal_id = next_deal_id++,
85-
.params = deal_params}};
85+
DealProposalV1_0_0 proposal{payload_cid, next_deal_id++, deal_params};
8686
auto deal{std::make_shared<DealState>(
8787
proposal, ipfs_, handler, client_wallet, miner_wallet, total_funds)};
8888
OUTCOME_TRY(peer_info, getPeerInfo(provider_peer));
8989
deal->pdtid = datatransfer_->pull(
9090
peer_info,
9191
payload_cid,
9292
deal_params.selector,
93-
DealProposal::Named::type,
93+
DealProposalV1_0_0::type,
9494
codec::cbor::encode(proposal).value(),
9595
[this, deal](auto &type, auto voucher) {
9696
onPullData(deal, type, voucher);
@@ -103,7 +103,7 @@ namespace fc::markets::retrieval::client {
103103
void RetrievalClientImpl::onPullData(const std::shared_ptr<DealState> &deal,
104104
const std::string &type,
105105
BytesIn voucher) {
106-
OUTCOME_EXCEPT(res, codec::cbor::decode<DealResponse::Named>(voucher));
106+
OUTCOME_EXCEPT(res, codec::cbor::decode<DealResponseV1_0_0>(voucher));
107107
auto after{[=](auto &res) {
108108
if (res.status == DealStatus::kDealStatusCompleted) {
109109
deal->handler(outcome::success());
@@ -214,11 +214,11 @@ namespace fc::markets::retrieval::client {
214214
}
215215
datatransfer_->pullOut(
216216
deal_state->pdtid,
217-
DealPayment::Named::type,
217+
DealPaymentV1_0_0::type,
218218
codec::cbor::encode(
219-
DealPayment::Named{{deal_state->proposal.deal_id,
220-
deal_state->payment_channel_address,
221-
maybe_voucher.value()}})
219+
DealPaymentV1_0_0{{deal_state->proposal.deal_id,
220+
deal_state->payment_channel_address,
221+
maybe_voucher.value()}})
222222
.value());
223223
deal_state->state.pay(payment_requested);
224224
}

core/markets/retrieval/client/impl/retrieval_client_impl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ namespace fc::markets::retrieval::client {
3333
* State of ongoing retrieval deal.
3434
*/
3535
struct DealState {
36-
DealState(const DealProposal &proposal,
36+
DealState(const DealProposalV1_0_0 &proposal,
3737
const IpldPtr &ipld,
3838
RetrieveResponseHandler handler,
3939
Address client_wallet,
4040
Address miner_wallet,
4141
TokenAmount total_funds);
4242

43-
DealProposal proposal;
43+
DealProposalV1_0_0 proposal;
4444
State state;
4545
PeerDtId pdtid;
4646
bool accepted{}, all_blocks{};

core/markets/retrieval/client/retrieval_client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
namespace fc::markets::retrieval::client {
1818
using libp2p::peer::PeerInfo;
1919
using QueryResponseHandler =
20-
std::function<void(outcome::result<QueryResponse>)>;
20+
std::function<void(outcome::result<QueryResponseV1_0_0>)>;
2121
using RetrieveResponseHandler = std::function<void(outcome::result<void>)>;
2222

2323
/*

core/markets/retrieval/protocols/query_protocol.hpp

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@
88
#include <string>
99

1010
#include <libp2p/peer/protocol.hpp>
11+
#include "codec/cbor/cbor_codec.hpp"
1112
#include "codec/cbor/streams_annotation.hpp"
1213
#include "primitives/address/address.hpp"
1314
#include "primitives/address/address_codec.hpp"
1415
#include "primitives/cid/cid.hpp"
1516
#include "primitives/types.hpp"
1617

1718
namespace fc::markets::retrieval {
19+
using codec::cbor::CborDecodeStream;
20+
using codec::cbor::CborEncodeStream;
1821
using primitives::TokenAmount;
1922
using primitives::address::Address;
2023

21-
/**
22-
* @brief Query Protocol ID V0
23-
*/
24-
const libp2p::peer::Protocol kQueryProtocolId = "/fil/retrieval/qry/0.0.1";
24+
/** Query Protocol ID v0.0.1 */
25+
const libp2p::peer::Protocol kQueryProtocolIdV0_0_1 =
26+
"/fil/retrieval/qry/0.0.1";
27+
28+
/** Query Protocol ID v1.0.0 */
29+
const libp2p::peer::Protocol kQueryProtocolIdV1_0_0 =
30+
"/fil/retrieval/qry/1.0.0";
2531

2632
/**
2733
* @struct Request from client to provider
@@ -32,19 +38,54 @@ namespace fc::markets::retrieval {
3238
boost::optional<CID> piece_cid;
3339
};
3440

41+
struct QueryParamsV0_0_1 : public QueryParams {};
42+
43+
CBOR_TUPLE(QueryParamsV0_0_1, piece_cid)
44+
45+
struct QueryParamsV1_0_0 : public QueryParams {};
46+
47+
inline CBOR2_ENCODE(QueryParamsV1_0_0) {
48+
auto m{CborEncodeStream::orderedMap()};
49+
m["PieceCID"] << v.piece_cid;
50+
return s << m;
51+
}
52+
53+
inline CBOR2_DECODE(QueryParamsV1_0_0) {
54+
auto m{s.map()};
55+
CborDecodeStream::named(m, "PieceCID") >> v.piece_cid;
56+
return s;
57+
}
58+
59+
CBOR_TUPLE(QueryParams, piece_cid)
60+
3561
struct QueryRequest {
36-
/* V0 protocol */
3762
/* Identifier of the requested item */
3863
CID payload_cid;
3964

40-
/* V1 protocol */
4165
/* Additional params */
4266
QueryParams params;
4367
};
4468

45-
CBOR_TUPLE(QueryParams, piece_cid)
69+
struct QueryRequestV0_0_1 : public QueryRequest {};
70+
71+
CBOR_TUPLE(QueryRequestV0_0_1, payload_cid, params)
4672

47-
CBOR_TUPLE(QueryRequest, payload_cid, params)
73+
struct QueryRequestV1_0_0 : public QueryRequest {};
74+
75+
inline CBOR2_ENCODE(QueryRequestV1_0_0) {
76+
auto m{CborEncodeStream::orderedMap()};
77+
m["PayloadCID"] << v.payload_cid;
78+
m["QueryParams"] << QueryParamsV1_0_0{v.params};
79+
return s << m;
80+
}
81+
82+
inline CBOR2_DECODE(QueryRequestV1_0_0) {
83+
auto m{s.map()};
84+
CborDecodeStream::named(m, "PayloadCID") >> v.payload_cid;
85+
v.params =
86+
CborDecodeStream::named(m, "QueryParams").get<QueryParamsV1_0_0>();
87+
return s;
88+
}
4889

4990
/**
5091
* @enum Status of the query response
@@ -106,7 +147,9 @@ namespace fc::markets::retrieval {
106147
TokenAmount unseal_price;
107148
};
108149

109-
CBOR_TUPLE(QueryResponse,
150+
struct QueryResponseV0_0_1 : public QueryResponse {};
151+
152+
CBOR_TUPLE(QueryResponseV0_0_1,
110153
response_status,
111154
item_status,
112155
item_size,
@@ -115,6 +158,37 @@ namespace fc::markets::retrieval {
115158
payment_interval,
116159
interval_increase,
117160
message,
118-
unseal_price);
161+
unseal_price)
162+
163+
struct QueryResponseV1_0_0 : public QueryResponse {};
164+
165+
inline CBOR2_ENCODE(QueryResponseV1_0_0) {
166+
auto m{CborEncodeStream::orderedMap()};
167+
m["Status"] << v.response_status;
168+
m["PieceCIDFound"] << v.item_status;
169+
m["Size"] << v.item_size;
170+
m["PaymentAddress"] << v.payment_address;
171+
m["MinPricePerByte"] << v.min_price_per_byte;
172+
m["MaxPaymentInterval"] << v.payment_interval;
173+
m["MaxPaymentIntervalIncrease"] << v.interval_increase;
174+
m["Message"] << v.message;
175+
m["UnsealPrice"] << v.unseal_price;
176+
return s << m;
177+
}
178+
179+
inline CBOR2_DECODE(QueryResponseV1_0_0) {
180+
auto m{s.map()};
181+
CborDecodeStream::named(m, "Status") >> v.response_status;
182+
CborDecodeStream::named(m, "PieceCIDFound") >> v.item_status;
183+
CborDecodeStream::named(m, "Size") >> v.item_size;
184+
CborDecodeStream::named(m, "PaymentAddress") >> v.payment_address;
185+
CborDecodeStream::named(m, "MinPricePerByte") >> v.min_price_per_byte;
186+
CborDecodeStream::named(m, "MaxPaymentInterval") >> v.payment_interval;
187+
CborDecodeStream::named(m, "MaxPaymentIntervalIncrease")
188+
>> v.interval_increase;
189+
CborDecodeStream::named(m, "Message") >> v.message;
190+
CborDecodeStream::named(m, "UnsealPrice") >> v.unseal_price;
191+
return s;
192+
}
119193

120194
} // namespace fc::markets::retrieval

0 commit comments

Comments
 (0)