Skip to content

Commit 047c95c

Browse files
Feature/storage market logic (#202)
Signed-off-by: Alexey-N-Chernyshov <[email protected]>
1 parent 281c71d commit 047c95c

File tree

13 files changed

+398
-152
lines changed

13 files changed

+398
-152
lines changed

core/markets/storage/client/impl/storage_market_client_impl.cpp

Lines changed: 146 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "markets/pieceio/pieceio_impl.hpp"
1313
#include "vm/message/message.hpp"
1414
#include "vm/message/message_util.hpp"
15+
#include "markets/storage/common.hpp"
1516

1617
#define CALLBACK_ACTION(_action) \
1718
[self{shared_from_this()}](auto deal, auto event, auto from, auto to) { \
@@ -20,11 +21,12 @@
2021
deal->state = to; \
2122
}
2223

23-
#define FSM_SEND(client_deal, event) \
24-
OUTCOME_EXCEPT(fsm_->send(client_deal, event))
25-
26-
#define SELF_FSM_SEND(client_deal, event) \
27-
OUTCOME_EXCEPT(self->fsm_->send(client_deal, event))
24+
#define FSM_HALT_ON_ERROR(result, msg, deal) \
25+
if (result.has_error()) { \
26+
deal->message = msg + std::string(". ") + result.error().message(); \
27+
FSM_SEND(deal, ClientEvent::ClientEventFailed); \
28+
return; \
29+
}
2830

2931
#define SELF_FSM_HALT_ON_ERROR(result, msg, deal) \
3032
if (result.has_error()) { \
@@ -34,7 +36,7 @@
3436
}
3537

3638
namespace fc::markets::storage::client {
37-
39+
using api::MsgWait;
3840
using host::HostContext;
3941
using host::HostContextImpl;
4042
using libp2p::peer::PeerId;
@@ -43,6 +45,7 @@ namespace fc::markets::storage::client {
4345
using vm::VMExitCode;
4446
using vm::actor::kStorageMarketAddress;
4547
using vm::actor::builtin::market::getProposalCid;
48+
using vm::actor::builtin::market::PublishStorageDeals;
4649
using vm::message::kMessageVersion;
4750
using vm::message::SignedMessage;
4851
using vm::message::UnsignedMessage;
@@ -146,24 +149,25 @@ namespace fc::markets::storage::client {
146149
}
147150
auto stream = std::move(stream_res.value());
148151
AskRequest request{.miner = info.address};
149-
stream->write(request,
150-
[self, info, stream, signed_ask_handler](
151-
outcome::result<size_t> written) {
152-
if (!self->hasValue(written,
153-
"Cannot send request",
154-
stream,
155-
signed_ask_handler)) {
156-
return;
157-
}
158-
stream->template read<AskResponse>(
159-
[self, info, stream, signed_ask_handler](
160-
outcome::result<AskResponse> response) {
161-
auto validated_ask_response =
162-
self->validateAskResponse(response, info);
163-
signed_ask_handler(validated_ask_response);
164-
self->network_->closeStreamGracefully(stream);
165-
});
166-
});
152+
stream->write(
153+
request,
154+
[self, info, stream, signed_ask_handler](
155+
outcome::result<size_t> written) {
156+
if (!self->hasValue(written,
157+
"Cannot send request",
158+
stream,
159+
signed_ask_handler)) {
160+
return;
161+
}
162+
stream->template read<AskResponse>(
163+
[self, info, stream, signed_ask_handler](
164+
outcome::result<AskResponse> response) {
165+
auto validated_ask_response =
166+
self->validateAskResponse(response, info);
167+
signed_ask_handler(validated_ask_response);
168+
self->network_->closeStreamGracefully(stream);
169+
});
170+
});
167171
});
168172
}
169173

@@ -223,7 +227,8 @@ namespace fc::markets::storage::client {
223227
"DealStream opened to "
224228
+ peerInfoToPrettyString(provider_info.peer_info));
225229

226-
self->connections_[proposal_cid] = stream.value();
230+
std::lock_guard<std::mutex> lock(self->connections_mutex_);
231+
self->connections_.emplace(proposal_cid, stream.value());
227232
SELF_FSM_SEND(client_deal, ClientEvent::ClientEventOpen);
228233
});
229234

@@ -347,6 +352,79 @@ namespace fc::markets::storage::client {
347352
return outcome::success();
348353
}
349354

355+
outcome::result<bool> StorageMarketClientImpl::verifyDealPublished(
356+
std::shared_ptr<ClientDeal> deal) {
357+
OUTCOME_TRY(msg_wait, api_->StateWaitMsg(deal->publish_message));
358+
OUTCOME_TRY(msg_state, msg_wait.waitSync());
359+
if (msg_state.receipt.exit_code != VMExitCode::Ok) {
360+
deal->message =
361+
"Publish deal exit code "
362+
+ std::to_string(static_cast<uint64_t>(msg_state.receipt.exit_code));
363+
return false;
364+
}
365+
366+
// check if published
367+
OUTCOME_TRY(publish_message, api_->ChainGetMessage(deal->publish_message));
368+
OUTCOME_TRY(chain_head, api_->ChainHead());
369+
OUTCOME_TRY(tipset_key, chain_head.makeKey());
370+
OUTCOME_TRY(miner_info,
371+
api_->StateMinerInfo(
372+
deal->client_deal_proposal.proposal.provider, tipset_key));
373+
OUTCOME_TRY(from_id_address,
374+
api_->StateLookupID(publish_message.from, tipset_key));
375+
if (from_id_address != miner_info.worker) {
376+
deal->message = "Publisher is not storage provider";
377+
return false;
378+
}
379+
if (publish_message.to != kStorageMarketAddress) {
380+
deal->message = "Receiver is not storage market actor";
381+
return false;
382+
}
383+
if (publish_message.method != PublishStorageDeals::Number) {
384+
deal->message = "Wrong method called";
385+
return false;
386+
}
387+
388+
// check publish contains proposal cid
389+
OUTCOME_TRY(proposals,
390+
codec::cbor::decode<std::vector<ClientDealProposal>>(
391+
publish_message.params));
392+
auto it = std::find(
393+
proposals.begin(), proposals.end(), deal->client_deal_proposal);
394+
if (it == proposals.end()) {
395+
OUTCOME_TRY(proposal_cid_str, deal->proposal_cid.toString());
396+
deal->message = "deal publish didn't contain our deal (message cid: "
397+
+ proposal_cid_str + ")";
398+
}
399+
400+
// get proposal id from publish call return
401+
int index = std::distance(proposals.begin(), it);
402+
OUTCOME_TRY(publish_result,
403+
codec::cbor::decode<PublishStorageDeals::Result>(
404+
msg_state.receipt.return_value));
405+
deal->deal_id = publish_result.deals[index];
406+
return true;
407+
}
408+
409+
outcome::result<std::shared_ptr<CborStream>>
410+
StorageMarketClientImpl::getStream(const CID &proposal_cid) {
411+
std::lock_guard<std::mutex> lock(connections_mutex_);
412+
auto stream_it = connections_.find(proposal_cid);
413+
if (stream_it == connections_.end()) {
414+
return StorageMarketClientError::STREAM_LOOKUP_ERROR;
415+
}
416+
return stream_it->second;
417+
}
418+
419+
void StorageMarketClientImpl::finalizeDeal(std::shared_ptr<ClientDeal> deal) {
420+
std::lock_guard<std::mutex> lock(connections_mutex_);
421+
auto stream_it = connections_.find(deal->proposal_cid);
422+
if (stream_it != connections_.end()) {
423+
network_->closeStreamGracefully(stream_it->second);
424+
}
425+
connections_.erase(stream_it);
426+
}
427+
350428
std::vector<ClientTransition> StorageMarketClientImpl::makeFSMTransitions() {
351429
return {ClientTransition(ClientEvent::ClientEventOpen)
352430
.from(StorageDealStatus::STORAGE_DEAL_UNKNOWN)
@@ -414,36 +492,49 @@ namespace fc::markets::storage::client {
414492
ClientEvent event,
415493
StorageDealStatus from,
416494
StorageDealStatus to) {
417-
// TODO wait for funding
495+
auto maybe_wait = api_->StateWaitMsg(deal->add_funds_cid.get());
496+
FSM_HALT_ON_ERROR(maybe_wait, "Wait for funding error", deal);
497+
maybe_wait.value().wait(
498+
[self{shared_from_this()}, deal](outcome::result<MsgWait> result) {
499+
SELF_FSM_HALT_ON_ERROR(result, "Wait for funding error", deal);
500+
if (result.value().receipt.exit_code != VMExitCode::Ok) {
501+
deal->message = "Funding exit code "
502+
+ std::to_string(static_cast<uint64_t>(
503+
result.value().receipt.exit_code));
504+
SELF_FSM_SEND(deal, ClientEvent::ClientEventFailed);
505+
return;
506+
}
507+
SELF_FSM_SEND(deal, ClientEvent::ClientEventFundsEnsured);
508+
});
418509
}
419510

420511
void StorageMarketClientImpl::onClientEventFundsEnsured(
421512
std::shared_ptr<ClientDeal> deal,
422513
ClientEvent event,
423514
StorageDealStatus from,
424515
StorageDealStatus to) {
425-
// TODO handle if stream is absent
426-
auto stream = connections_[deal->proposal_cid];
516+
auto stream = getStream(deal->proposal_cid);
517+
FSM_HALT_ON_ERROR(stream, "Stream not found.", deal);
427518

428519
Proposal proposal{.deal_proposal = deal->client_deal_proposal,
429520
.piece = deal->data_ref};
430-
stream->write(proposal,
431-
[self{shared_from_this()}, deal, stream](
432-
outcome::result<size_t> written) {
433-
SELF_FSM_HALT_ON_ERROR(
434-
written, "Send proposal error", deal);
435-
self->network_->closeStreamGracefully(stream);
436-
SELF_FSM_SEND(deal, ClientEvent::ClientEventDealProposed);
437-
});
521+
stream.value()->write(
522+
proposal,
523+
[self{shared_from_this()}, deal, stream](
524+
outcome::result<size_t> written) {
525+
SELF_FSM_HALT_ON_ERROR(written, "Send proposal error", deal);
526+
SELF_FSM_SEND(deal, ClientEvent::ClientEventDealProposed);
527+
});
438528
}
439529

440530
void StorageMarketClientImpl::onClientEventDealProposed(
441531
std::shared_ptr<ClientDeal> deal,
442532
ClientEvent event,
443533
StorageDealStatus from,
444534
StorageDealStatus to) {
445-
// TODO handle if stream is absent
446-
auto stream = connections_[deal->proposal_cid];
535+
auto stream_res = getStream(deal->proposal_cid);
536+
FSM_HALT_ON_ERROR(stream_res, "Stream not found.", deal);
537+
auto stream = std::move(stream_res.value());
447538
stream->read<SignedResponse>([self{shared_from_this()}, deal, stream](
448539
outcome::result<SignedResponse> response) {
449540
SELF_FSM_HALT_ON_ERROR(response, "Read response error", deal);
@@ -457,11 +548,12 @@ namespace fc::markets::storage::client {
457548
return;
458549
}
459550
if (response.value().response.state
460-
!= StorageDealStatus::STORAGE_DEAL_PROPOSAL_ACCEPTED) {
461-
// TODO handle reject reason
551+
!= StorageDealStatus::STORAGE_DEAL_PUBLISHING) {
552+
deal->message = response.value().response.message;
462553
SELF_FSM_SEND(deal, ClientEvent::ClientEventDealRejected);
463554
return;
464555
}
556+
deal->publish_message = response.value().response.publish_message;
465557
self->network_->closeStreamGracefully(stream);
466558
SELF_FSM_SEND(deal, ClientEvent::ClientEventDealAccepted);
467559
});
@@ -481,16 +573,21 @@ namespace fc::markets::storage::client {
481573
ClientEvent event,
482574
StorageDealStatus from,
483575
StorageDealStatus to) {
484-
// todo validate deal published
485-
OUTCOME_EXCEPT(fsm_->send(deal, ClientEvent::ClientEventDealPublished));
576+
auto verified = verifyDealPublished(deal);
577+
FSM_HALT_ON_ERROR(verified, "Cannot get publish message", deal);
578+
if (!verified.value()) {
579+
FSM_SEND(deal, ClientEvent::ClientEventFailed);
580+
return;
581+
}
582+
FSM_SEND(deal, ClientEvent::ClientEventDealPublished);
486583
}
487584

488585
void StorageMarketClientImpl::onClientEventDealPublished(
489586
std::shared_ptr<ClientDeal> deal,
490587
ClientEvent event,
491588
StorageDealStatus from,
492589
StorageDealStatus to) {
493-
// verify deal activated - on deal sector commit
590+
// TODO (a.chernyshov) verify deal activated - on deal sector commit
494591
OUTCOME_EXCEPT(fsm_->send(deal, ClientEvent::ClientEventDealActivated));
495592
}
496593

@@ -499,21 +596,23 @@ namespace fc::markets::storage::client {
499596
ClientEvent event,
500597
StorageDealStatus from,
501598
StorageDealStatus to) {
502-
// final state
503-
// todo cleanup
599+
// final success state
600+
finalizeDeal(deal);
504601
}
505602

506603
void StorageMarketClientImpl::onClientEventFailed(
507604
std::shared_ptr<ClientDeal> deal,
508605
ClientEvent event,
509606
StorageDealStatus from,
510607
StorageDealStatus to) {
608+
// final error state
511609
std::stringstream ss;
512610
ss << "Proposal ";
513611
auto maybe_prpoposal_cid = deal->proposal_cid.toString();
514612
if (maybe_prpoposal_cid) ss << maybe_prpoposal_cid.value() << " ";
515613
ss << "failed. " << deal->message;
516614
logger_->error(ss.str());
615+
finalizeDeal(deal);
517616
}
518617

519618
} // namespace fc::markets::storage::client
@@ -537,6 +636,8 @@ OUTCOME_CPP_DEFINE_CATEGORY(fc::markets::storage::client,
537636
return "StorageMarketClientError: add funds method call returned error";
538637
case StorageMarketClientError::LOCAL_DEAL_NOT_FOUND:
539638
return "StorageMarketClientError: local deal not found";
639+
case StorageMarketClientError::STREAM_LOOKUP_ERROR:
640+
return "StorageMarketClientError: stream look up error";
540641
}
541642

542643
return "StorageMarketClientError: unknown error";

core/markets/storage/client/impl/storage_market_client_impl.hpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#define CPP_FILECOIN_CORE_MARKETS_STORAGE_CLIENT_IMPL_HPP
88

99
#include <libp2p/host/host.hpp>
10+
#include <mutex>
1011
#include "api/api.hpp"
1112
#include "common/logger.hpp"
1213
#include "data_transfer/manager.hpp"
@@ -110,6 +111,27 @@ namespace fc::markets::storage::client {
110111
const SignedResponse &response,
111112
const std::shared_ptr<ClientDeal> &deal);
112113

114+
/**
115+
* Verifies if deal was published correctly
116+
* @param deal state with publish message cid set
117+
* @return true if published or false otherwise
118+
*/
119+
outcome::result<bool> verifyDealPublished(std::shared_ptr<ClientDeal> deal);
120+
121+
/**
122+
* Look up stream by proposal cid
123+
* @param proposal_cid - key to find stream
124+
* @return stream associated with proposal
125+
*/
126+
outcome::result<std::shared_ptr<CborStream>> getStream(
127+
const CID &proposal_cid);
128+
129+
/**
130+
* Finalize deal, close connection, clean up
131+
* @param deal - deal to clean up
132+
*/
133+
void finalizeDeal(std::shared_ptr<ClientDeal> deal);
134+
113135
/**
114136
* Creates all FSM transitions
115137
* @return vector of transitions for fsm
@@ -182,6 +204,8 @@ namespace fc::markets::storage::client {
182204

183205
/**
184206
* @brief Handle deal accepted
207+
* Validates that the provided deal has appeared on chain and references the
208+
* same ClientDeal
185209
* @param deal - current storage deal
186210
* @param event - ClientEventDealAccepted
187211
* @param from - STORAGE_DEAL_VALIDATING
@@ -262,6 +286,7 @@ namespace fc::markets::storage::client {
262286
std::shared_ptr<Discovery> discovery_;
263287

264288
// connection manager
289+
std::mutex connections_mutex_;
265290
std::map<CID, std::shared_ptr<CborStream>> connections_;
266291

267292
/** State machine */
@@ -279,7 +304,8 @@ namespace fc::markets::storage::client {
279304
PIECE_DATA_NOT_SET_MANUAL_TRANSFER,
280305
PIECE_SIZE_GREATER_SECTOR_SIZE,
281306
ADD_FUNDS_CALL_ERROR,
282-
LOCAL_DEAL_NOT_FOUND
307+
LOCAL_DEAL_NOT_FOUND,
308+
STREAM_LOOKUP_ERROR,
283309
};
284310

285311
} // namespace fc::markets::storage::client

core/markets/storage/common.hpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,8 @@
66
#ifndef CPP_FILECOIN_MARKETS_STORAGE_COMMON_HPP
77
#define CPP_FILECOIN_MARKETS_STORAGE_COMMON_HPP
88

9-
#include "primitives/types.hpp"
9+
#define FSM_SEND(deal, event) OUTCOME_EXCEPT(fsm_->send(deal, event))
1010

11-
namespace fc::markets::storage {
12-
13-
using primitives::TokenAmount;
14-
15-
struct Balance {
16-
TokenAmount locked;
17-
TokenAmount available;
18-
};
19-
20-
} // namespace fc::markets::storage
11+
#define SELF_FSM_SEND(deal, event) OUTCOME_EXCEPT(self->fsm_->send(deal, event))
2112

2213
#endif // CPP_FILECOIN_MARKETS_STORAGE_COMMON_HPP

0 commit comments

Comments
 (0)