Skip to content

Commit 9f37400

Browse files
authored
Fixes for miner (#599)
* Miner Fixes Signed-off-by: ortyomka <[email protected]>
1 parent ad026fd commit 9f37400

File tree

20 files changed

+267
-185
lines changed

20 files changed

+267
-185
lines changed

cmake/dependencies.cmake

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
hunter_add_package(GTest)
66
find_package(GTest CONFIG REQUIRED)
77

8-
hunter_add_package(libarchive)
9-
find_package(libarchive CONFIG REQUIRED)
10-
118
# https://docs.hunter.sh/en/latest/packages/pkg/Boost.html
129
hunter_add_package(Boost COMPONENTS date_time filesystem iostreams random program_options thread)
1310
find_package(Boost CONFIG REQUIRED date_time filesystem iostreams random program_options thread)
@@ -72,5 +69,8 @@ find_package(RapidJSON CONFIG REQUIRED)
7269
hunter_add_package(jwt-cpp)
7370
find_package(jwt-cpp CONFIG REQUIRED)
7471

72+
hunter_add_package(libarchive)
73+
find_package(libarchive CONFIG REQUIRED)
74+
7575
hunter_add_package(prometheus-cpp)
7676
find_package(prometheus-cpp CONFIG REQUIRED)

core/common/put_in_function.hpp

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
namespace fc::common {
7+
8+
/**
9+
* Class to put move-only lambdas in std::function interface
10+
*/
11+
template <typename T>
12+
class PutInFunction {
13+
public:
14+
explicit PutInFunction(T &&function)
15+
: move_function_{std::make_shared<T>(std::move(function))} {};
16+
17+
template <typename... Args>
18+
auto operator()(Args &&...args) const {
19+
return move_function_->operator()(std::forward<Args>(args)...);
20+
}
21+
22+
private:
23+
std::shared_ptr<T> move_function_;
24+
};
25+
} // namespace fc::common

core/common/vector_cow.hpp

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include <gsl/span>
9+
#include <variant>
10+
11+
namespace fc {
12+
13+
template <typename T>
14+
class VectorCoW {
15+
public:
16+
using VectorType = std::vector<T>;
17+
using SpanType = gsl::span<const T>;
18+
19+
VectorCoW() = default;
20+
// NOLINTNEXTLINE(google-explicit-constructor)
21+
VectorCoW(VectorType &&vector) : variant_{std::move(vector)} {}
22+
VectorCoW(const VectorType &vector) = delete;
23+
// NOLINTNEXTLINE(google-explicit-constructor)
24+
VectorCoW(const SpanType &span) : variant_{span} {}
25+
VectorCoW(const VectorCoW<T> &) = delete;
26+
VectorCoW(VectorCoW<T> &&) = default;
27+
VectorCoW<T> &operator=(const VectorCoW<T> &) = delete;
28+
VectorCoW<T> &operator=(VectorCoW<T> &&) = default;
29+
~VectorCoW() = default;
30+
31+
bool owned() const {
32+
return variant_.index() == 1;
33+
}
34+
35+
SpanType span() const {
36+
if (!owned()) {
37+
return std::get<SpanType>(variant_);
38+
}
39+
return SpanType{std::get<VectorType>(variant_)};
40+
}
41+
42+
size_t size() const {
43+
return span().size();
44+
}
45+
46+
bool empty() const {
47+
return span().empty();
48+
}
49+
50+
// get mutable container reference, copy once if span
51+
VectorType &mut() {
52+
if (!owned()) {
53+
const auto span = std::get<SpanType>(variant_);
54+
variant_.template emplace<VectorType>(span.begin(), span.end());
55+
}
56+
return std::get<VectorType>(variant_);
57+
}
58+
59+
// move container away, copy once if span
60+
VectorType into() {
61+
auto vector{std::move(mut())};
62+
variant_.template emplace<SpanType>();
63+
return vector;
64+
}
65+
66+
private:
67+
std::variant<SpanType, VectorType> variant_;
68+
};
69+
} // namespace fc

core/markets/storage/provider/impl/provider_impl.cpp

+12-4
Original file line numberDiff line numberDiff line change
@@ -681,10 +681,7 @@ namespace fc::markets::storage::provider {
681681
deal_context->deal->is_fast_retrieval});
682682
FSM_HALT_ON_ERROR(
683683
maybe_piece_location, "Unable to locate piece", deal_context);
684-
FSM_HALT_ON_ERROR(
685-
recordPieceInfo(deal_context->deal, maybe_piece_location.value()),
686-
"Record piece failed",
687-
deal_context);
684+
deal_context->maybe_piece_location = maybe_piece_location.value();
688685
// TODO(a.chernyshov): add piece retry
689686
FSM_SEND(deal_context, ProviderEvent::ProviderEventDealHandedOff);
690687
}
@@ -700,6 +697,17 @@ namespace fc::markets::storage::provider {
700697
}
701698

702699
FSM_HANDLE_DEFINITION(StorageProviderImpl::onProviderEventDealActivated) {
700+
if (not deal_context->maybe_piece_location.has_value()) {
701+
(deal_context)->deal->message = "Unknown piece location";
702+
FSM_SEND((deal_context), ProviderEvent::ProviderEventFailed);
703+
return;
704+
}
705+
706+
FSM_HALT_ON_ERROR(
707+
recordPieceInfo(deal_context->deal,
708+
deal_context->maybe_piece_location.value()),
709+
"Record piece failed",
710+
deal_context);
703711
// TODO(a.chernyshov): wait expiration
704712
}
705713

core/markets/storage/provider/impl/provider_impl.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ namespace fc::markets::storage::provider {
9090
struct DealContext {
9191
std::shared_ptr<MinerDeal> deal;
9292
std::string protocol;
93+
boost::optional<PieceLocation> maybe_piece_location{};
9394
};
9495

9596
using ProviderTransition =
@@ -116,7 +117,6 @@ namespace fc::markets::storage::provider {
116117
if (request) {
117118
if (auto asker{stored_ask.lock()}) {
118119
if (auto ask{asker->getAsk(request.value().miner)}) {
119-
120120
return stream->write(AskResponseType{{ask.value()}},
121121
[stream](auto) { stream->close(); });
122122
}

core/miner/storage_fsm/impl/sealing_impl.cpp

+11-8
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,13 @@ namespace fc::mining {
251251
auto sector_ref = minerSector(seal_proof_type, piece_location.sector);
252252

253253
if (sector_and_padding.padding.size != 0) {
254-
OUTCOME_TRY(
255-
sealer_->addPieceSync(sector_ref,
256-
unsealed_sector.piece_sizes,
257-
sector_and_padding.padding.size.unpadded(),
258-
PieceData::makeNull(),
259-
kDealSectorPriority));
254+
OUTCOME_TRY(sealer_->addPieceSync(
255+
sector_ref,
256+
VectorCoW(gsl::span<const UnpaddedPieceSize>(
257+
unsealed_sector.piece_sizes)),
258+
sector_and_padding.padding.size.unpadded(),
259+
PieceData::makeNull(),
260+
kDealSectorPriority));
260261

261262
unsealed_sector.stored += sector_and_padding.padding.size;
262263

@@ -283,7 +284,9 @@ namespace fc::mining {
283284
logger_->info("Add piece to sector {}", piece_location.sector);
284285
OUTCOME_TRY(piece_info,
285286
sealer_->addPieceSync(sector_ref,
286-
unsealed_sector.piece_sizes,
287+
VectorCoW(
288+
gsl::span<const UnpaddedPieceSize>(
289+
unsealed_sector.piece_sizes)),
287290
size,
288291
std::move(piece_data),
289292
kDealSectorPriority));
@@ -972,7 +975,7 @@ namespace fc::mining {
972975

973976
sealer_->addPiece(
974977
sector,
975-
existing_piece_sizes,
978+
VectorCoW(std::move(existing_piece_sizes)),
976979
filler,
977980
PieceData::makeNull(),
978981
[fill = std::move(result), cb](const auto &maybe_error) -> void {

core/sector_storage/impl/local_worker.cpp

+24-20
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
#include <thread>
1919
#include <utility>
2020
#include "api/storage_miner/storage_api.hpp"
21+
#include "common/put_in_function.hpp"
2122
#include "primitives/rle_bitset/runs_utils.hpp"
2223
#include "primitives/sector_file/sector_file.hpp"
2324
#include "sector_storage/stores/store_error.hpp"
2425

2526
namespace fc::sector_storage {
27+
using common::PutInFunction;
2628
using primitives::piece::PaddedByteIndex;
2729
using primitives::piece::PaddedPieceSize;
2830
using primitives::sector_file::SectorFile;
@@ -270,20 +272,22 @@ namespace fc::sector_storage {
270272

271273
outcome::result<CallId> LocalWorker::addPiece(
272274
const SectorRef &sector,
273-
gsl::span<const UnpaddedPieceSize> piece_sizes,
275+
VectorCoW<UnpaddedPieceSize> piece_sizes,
274276
const UnpaddedPieceSize &new_piece_size,
275277
proofs::PieceData piece_data) {
276278
return asyncCall(
277279
sector,
278280
return_->ReturnAddPiece,
279-
[=, piece_data{std::make_shared<PieceData>(std::move(piece_data))}](
280-
Self self) -> outcome::result<PieceInfo> {
281+
PutInFunction([=,
282+
exist_sizes = std::move(piece_sizes),
283+
data = std::move(piece_data)](
284+
const Self &self) -> outcome::result<PieceInfo> {
281285
OUTCOME_TRY(max_size,
282286
primitives::sector::getSectorSize(sector.proof_type));
283287

284288
UnpaddedPieceSize offset;
285289

286-
for (const auto &piece_size : piece_sizes) {
290+
for (const auto &piece_size : exist_sizes.span()) {
287291
offset += piece_size;
288292
}
289293

@@ -299,7 +303,7 @@ namespace fc::sector_storage {
299303
}
300304
});
301305

302-
if (piece_sizes.empty()) {
306+
if (exist_sizes.empty()) {
303307
OUTCOME_TRYA(acquire_response,
304308
self->acquireSector(sector,
305309
SectorFileType::FTNone,
@@ -322,13 +326,13 @@ namespace fc::sector_storage {
322326
}
323327

324328
OUTCOME_TRY(piece_info,
325-
staged_file->write(*piece_data,
329+
staged_file->write(data,
326330
offset.padded(),
327331
new_piece_size.padded(),
328332
sector.proof_type));
329333

330334
return piece_info.value();
331-
});
335+
}));
332336
}
333337

334338
outcome::result<CallId> LocalWorker::sealPreCommit1(
@@ -461,11 +465,12 @@ namespace fc::sector_storage {
461465
}
462466

463467
outcome::result<CallId> LocalWorker::finalizeSector(
464-
const SectorRef &sector, const gsl::span<const Range> &keep_unsealed) {
468+
const SectorRef &sector, std::vector<Range> keep_unsealed) {
465469
return asyncCall(
466470
sector,
467471
return_->ReturnFinalizeSector,
468-
[=](Self self) -> outcome::result<void> {
472+
[=, keep_unsealed{std::move(keep_unsealed)}](
473+
Self self) -> outcome::result<void> {
469474
OUTCOME_TRY(size,
470475
primitives::sector::getSectorSize(sector.proof_type));
471476
{
@@ -796,17 +801,16 @@ namespace fc::sector_storage {
796801
}
797802
});
798803

799-
OUTCOME_TRY(self->proofs_->unsealRange(
800-
sector.proof_type,
801-
response.paths.cache,
802-
sealed,
803-
PieceData(fds[1]),
804-
sector.id.sector,
805-
sector.id.miner,
806-
randomness,
807-
unsealed_cid,
808-
primitives::piece::paddedIndex(range.offset),
809-
range.size));
804+
OUTCOME_TRY(self->proofs_->unsealRange(sector.proof_type,
805+
response.paths.cache,
806+
sealed,
807+
PieceData(fds[1]),
808+
sector.id.sector,
809+
sector.id.miner,
810+
randomness,
811+
unsealed_cid,
812+
range.offset,
813+
range.size));
810814
}
811815

812816
for (auto &t : threads) {

core/sector_storage/impl/local_worker.hpp

+5-7
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ namespace fc::sector_storage {
2929
std::shared_ptr<proofs::ProofEngine> proofs =
3030
std::make_shared<proofs::ProofEngineImpl>());
3131

32-
outcome::result<CallId> addPiece(
33-
const SectorRef &sector,
34-
gsl::span<const UnpaddedPieceSize> piece_sizes,
35-
const UnpaddedPieceSize &new_piece_size,
36-
PieceData piece_data) override;
32+
outcome::result<CallId> addPiece(const SectorRef &sector,
33+
VectorCoW<UnpaddedPieceSize> piece_sizes,
34+
const UnpaddedPieceSize &new_piece_size,
35+
PieceData piece_data) override;
3736

3837
outcome::result<CallId> sealPreCommit1(
3938
const SectorRef &sector,
@@ -54,8 +53,7 @@ namespace fc::sector_storage {
5453
const SectorRef &sector, const Commit1Output &commit_1_output) override;
5554

5655
outcome::result<CallId> finalizeSector(
57-
const SectorRef &sector,
58-
const gsl::span<const Range> &keep_unsealed) override;
56+
const SectorRef &sector, std::vector<Range> keep_unsealed) override;
5957

6058
outcome::result<CallId> replicaUpdate(
6159
const SectorRef &sector, const std::vector<PieceInfo> &pieces) override;

0 commit comments

Comments
 (0)