diff --git a/core/miner/address_selector.hpp b/core/miner/address_selector.hpp index 1addfbc2f9..0d0348c9a9 100644 --- a/core/miner/address_selector.hpp +++ b/core/miner/address_selector.hpp @@ -18,14 +18,15 @@ namespace fc::mining { /** * SelectAddress takes the maximal possible transaction fee from configs and - * chooses one of control addresses with minimal balance that is more than good - * funds to make miner work as long as possible. If no suitble control address - * were found, function returns worker address. + * chooses one of control addresses with minimal balance that is more than + * good funds to make miner work as long as possible. If no suitble control + * address were found, function returns worker address. */ inline outcome::result
SelectAddress( const MinerInfo &miner_info, const TokenAmount &good_funds, - const std::shared_ptr &api) { + const std::shared_ptr + &api) { // TODO update from lotus (Markuuu-s) TokenAmount finder_balance; auto finder = miner_info.control.end(); for (auto address = miner_info.control.begin(); diff --git a/core/miner/storage_fsm/CMakeLists.txt b/core/miner/storage_fsm/CMakeLists.txt index fd8042d0b8..ac7375eb66 100644 --- a/core/miner/storage_fsm/CMakeLists.txt +++ b/core/miner/storage_fsm/CMakeLists.txt @@ -38,6 +38,7 @@ target_link_libraries(events add_library(batcher impl/precommit_batcher_impl.cpp + impl/commit_batcher_impl.cpp ) target_link_libraries(batcher api diff --git a/core/miner/storage_fsm/commit_batcher.hpp b/core/miner/storage_fsm/commit_batcher.hpp new file mode 100644 index 0000000000..f56018d7cc --- /dev/null +++ b/core/miner/storage_fsm/commit_batcher.hpp @@ -0,0 +1,37 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once +#include "api/full_node/node_api.hpp" +#include "miner/storage_fsm/types.hpp" +#include "primitives/sector/sector.hpp" + +namespace fc::mining { + using CommitCallback = std::function &)>; + using api::FullNodeApi; + using primitives::sector::AggregateSealVerifyInfo; + using primitives::sector::RegisteredSealProof; + using sector_storage::Proof; + using types::SectorInfo; + + struct AggregateInput { + Proof proof; + AggregateSealVerifyInfo info; + RegisteredSealProof spt; + }; + + class CommitBatcher { + public: + virtual ~CommitBatcher() = default; + + virtual outcome::result addCommit( + const SectorInfo §or_info, + const AggregateInput &aggregate_input, + const CommitCallback &callBack) = 0; + + virtual void forceSend() = 0; + }; + +} // namespace fc::mining diff --git a/core/miner/storage_fsm/impl/commit_batcher_impl.cpp b/core/miner/storage_fsm/impl/commit_batcher_impl.cpp new file mode 100644 index 0000000000..e0ea68d35a --- /dev/null +++ b/core/miner/storage_fsm/impl/commit_batcher_impl.cpp @@ -0,0 +1,246 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once +#include "commit_batcher_impl.hpp" +#include +#include +#include "vm/actor/builtin/v5/miner/miner_actor.hpp" +#include "vm/actor/builtin/v6/monies.hpp" + +#include + +namespace fc::mining { + using api::kPushNoSpec; + using fc::BytesIn; + using fc::proofs::ProofEngine; + using primitives::ActorId; + using primitives::go::bigdiv; + using primitives::sector::AggregateSealVerifyInfo; + using primitives::sector::AggregateSealVerifyProofAndInfos; + using primitives::tipset::TipsetCPtr; + using vm::actor::MethodParams; + using vm::actor::builtin::types::miner::kChainFinality; + using vm::actor::builtin::types::miner::SectorPreCommitOnChainInfo; + using vm::actor::builtin::v5::miner::ProveCommitAggregate; + using vm::actor::builtin::v6::miner::AggregateProveCommitNetworkFee; + + CommitBatcherImpl::CommitBatcherImpl(const std::chrono::seconds &max_time, + std::shared_ptr api, + Address miner_address, + std::shared_ptr scheduler, + AddressSelector address_selector, + std::shared_ptr fee_config, + const size_t &max_size_callback, + std::shared_ptr proof) + : scheduler_(std::move(scheduler)), + max_delay_(max_time), + closest_cutoff_(max_time), + max_size_callback_(max_size_callback), + api_(std::move(api)), + miner_address_(std::move(miner_address)), + fee_config_(std::move(fee_config)), + proof_(std::move(proof)), + address_selector_(std::move(address_selector)) { + cutoff_start_ = std::chrono::system_clock::now(); + reschedule(max_delay_); + } + + outcome::result CommitBatcherImpl::addCommit( + const SectorInfo §or_info, + const AggregateInput &aggregate_input, + const CommitCallback &callback) { + std::unique_lock locker(mutex_storage_); + + const SectorNumber §or_number = sector_info.sector_number; + OUTCOME_TRY(head, api_->ChainHead()); + + pair_storage_[sector_number] = PairStorage{aggregate_input, callback}; + + if (pair_storage_.size() >= max_size_callback_) { + locker.unlock(); + forceSend(); + return outcome::success(); + } + + setCommitCutoff(head->epoch(), sector_info); + return outcome::success(); + } + + void CommitBatcherImpl::forceSend() { + MapPairStorage pair_storage_for_send_; + + std::unique_lock locker(mutex_storage_); + pair_storage_for_send_ = std::move(pair_storage_); + locker.unlock(); + + const auto maybe_result = sendBatch(pair_storage_for_send_); + + for (auto &[key, pair_storage] : pair_storage_for_send_) { + pair_storage.commit_callback(maybe_result); + } + + cutoff_start_ = std::chrono::system_clock::now(); + closest_cutoff_ = max_delay_; + reschedule(max_delay_); + } + + void CommitBatcherImpl::reschedule(std::chrono::milliseconds time) { + handle_ = scheduler_->scheduleWithHandle( + [&]() { + MapPairStorage pair_storage_for_send_; + + std::unique_lock locker(mutex_storage_); + pair_storage_for_send_ = std::move(pair_storage_); + locker.unlock(); + + const auto maybe_result = sendBatch(pair_storage_for_send_); + + for (auto &[key, pair_storage] : pair_storage_for_send_) { + pair_storage.commit_callback(maybe_result); + } + + cutoff_start_ = std::chrono::system_clock::now(); + closest_cutoff_ = max_delay_; + + handle_.reschedule(max_delay_).value(); + }, + time); + } + + outcome::result CommitBatcherImpl::sendBatch( + const MapPairStorage &pair_storage_for_send) { + if (pair_storage_for_send.empty()) { + cutoff_start_ = std::chrono::system_clock::now(); + return ERROR_TEXT("Empty Batcher"); + } + OUTCOME_TRY(head, api_->ChainHead()); + + const size_t total = pair_storage_for_send.size(); + + ProveCommitAggregate::Params params; + + std::vector proofs; + proofs.reserve(total); + + std::vector infos; + infos.reserve(total); + + BigInt collateral = 0; + + for (const auto &[sector_number, pair_storage] : pair_storage_for_send) { + OUTCOME_TRY(sector_collateral, + getSectorCollateral(sector_number, head->key)); + collateral = collateral + sector_collateral; + + params.sectors.insert(sector_number); + infos.push_back(pair_storage.aggregate_input.info); + } + + for (const auto &[sector_number, pair_storage] : pair_storage_for_send) { + proofs.push_back(pair_storage.aggregate_input.proof); + } + + const ActorId mid = miner_address_.getId(); + // TODO maybe long (AggregateSealProofs) + AggregateSealVerifyProofAndInfos aggregate_seal{ + .miner = mid, + .seal_proof = + pair_storage_for_send.at(infos[0].number).aggregate_input.spt, + .aggregate_proof = arp_, + .infos = infos}; + + std::vector proofsSpan; + proofsSpan.reserve(proofs.size()); + + for (const Proof &proof : proofs) { + proofsSpan.push_back(gsl::make_span(proof)); + } + + OUTCOME_TRY(proof_->aggregateSealProofs(aggregate_seal, proofsSpan)); + + params.proof = aggregate_seal.proof; + OUTCOME_TRY(encode, codec::cbor::encode(params)); + OUTCOME_TRY(miner_info, api_->StateMinerInfo(miner_address_, head->key)); + + const TokenAmount max_fee = + fee_config_->max_commit_batch_gas_fee.FeeForSector(proofs.size()); + + OUTCOME_TRY(tipset, api_->ChainGetTipSet(head->key)); + const BigInt base_fee = tipset->blks[0].parent_base_fee; + + TokenAmount agg_fee_raw = AggregateProveCommitNetworkFee( + infos.size(), base_fee); // TODO change to aggregateNetworkFee + + TokenAmount agg_fee = bigdiv(agg_fee_raw * agg_fee_num_, agg_fee_den_); + TokenAmount need_funds = collateral + agg_fee; + TokenAmount good_funds = max_fee + need_funds; + + OUTCOME_TRY(address, + address_selector_(miner_info, good_funds, need_funds, api_)); + OUTCOME_TRY(signed_messege, + api_->MpoolPushMessage( + vm::message::UnsignedMessage(miner_address_, + address, + 0, + need_funds, + max_fee, + {}, + ProveCommitAggregate::Number, + MethodParams{encode}), + kPushNoSpec)); + + cutoff_start_ = std::chrono::system_clock::now(); + return signed_messege.getCid(); + } + + void CommitBatcherImpl::setCommitCutoff(const ChainEpoch ¤t_epoch, + const SectorInfo §or_info) { + ChainEpoch cutoff_epoch = + sector_info.ticket_epoch + + static_cast(kEpochsInDay + kChainFinality); + ChainEpoch start_epoch{}; + for (const auto &piece : sector_info.pieces) { + if (!piece.deal_info) { + continue; + } + start_epoch = piece.deal_info->deal_schedule.start_epoch; + if (start_epoch < cutoff_epoch) { + cutoff_epoch = start_epoch; + } + } + if (cutoff_epoch <= current_epoch) { + forceSend(); + } else { + const auto temp_cutoff = std::chrono::milliseconds( + (cutoff_epoch - current_epoch) * kEpochDurationSeconds); + if ((closest_cutoff_ + - std::chrono::duration_cast( + std::chrono::system_clock::now() - cutoff_start_) + > temp_cutoff)) { + cutoff_start_ = std::chrono::system_clock::now(); + reschedule(temp_cutoff); + closest_cutoff_ = temp_cutoff; + } + } + } + + outcome::result CommitBatcherImpl::getSectorCollateral( + const SectorNumber §or_number, const TipsetKey &tip_set_key) { + OUTCOME_TRY(pci, + api_->StateSectorPreCommitInfo( + miner_address_, sector_number, tip_set_key)); + + OUTCOME_TRY(collateral, + api_->StateMinerInitialPledgeCollateral( + miner_address_, pci.info, tip_set_key)); + + collateral = collateral + pci.precommit_deposit; + collateral = std::max(BigInt(0), collateral); + + return collateral; + } + +} // namespace fc::mining diff --git a/core/miner/storage_fsm/impl/commit_batcher_impl.hpp b/core/miner/storage_fsm/impl/commit_batcher_impl.hpp new file mode 100644 index 0000000000..0593a1b4f8 --- /dev/null +++ b/core/miner/storage_fsm/impl/commit_batcher_impl.hpp @@ -0,0 +1,83 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once +#include +#include "miner/storage_fsm/commit_batcher.hpp" + +namespace fc::mining { + using api::MinerInfo; + using libp2p::basic::Scheduler; + using primitives::BigInt; + using primitives::SectorNumber; + using primitives::address::Address; + using primitives::sector::RegisteredAggregationProof; + using primitives::tipset::Tipset; + using primitives::tipset::TipsetKey; + using proofs::ProofEngine; + using types::FeeConfig; + + using AddressSelector = std::function( + const MinerInfo &miner_info, + const TokenAmount &good_funds, + const TokenAmount &need_funds, + const std::shared_ptr &api)>; + + class CommitBatcherImpl : public CommitBatcher { + public: + struct PairStorage { + AggregateInput aggregate_input; + CommitCallback commit_callback; + }; + using MapPairStorage = std::map; + + CommitBatcherImpl(const std::chrono::seconds &max_time, + std::shared_ptr api, + Address miner_address, + std::shared_ptr scheduler, + AddressSelector address_selector, + std::shared_ptr fee_config, + const size_t &max_size_callback, + std::shared_ptr proof); + + outcome::result addCommit(const SectorInfo §or_info, + const AggregateInput &aggregate_input, + const CommitCallback &callBack) override; + + void forceSend() override; + + void setCommitCutoff(const ChainEpoch ¤t_epoch, + const SectorInfo §or_info); + + private: + std::shared_ptr scheduler_; + Scheduler::Handle handle_; + std::chrono::milliseconds max_delay_; + std::chrono::milliseconds closest_cutoff_; + std::chrono::system_clock::time_point cutoff_start_; + size_t max_size_callback_; + MapPairStorage pair_storage_; + std::shared_ptr api_; + Address miner_address_; + std::shared_ptr fee_config_; + std::shared_ptr proof_; + std::mutex mutex_storage_; + AddressSelector address_selector_; + common::Logger logger_; + + const BigInt agg_fee_num_ = BigInt(110); + const BigInt agg_fee_den_ = BigInt(100); + const RegisteredAggregationProof arp_ = RegisteredAggregationProof(0); + + // TODO (Markuu-s) Add processIndividually + void reschedule(std::chrono::milliseconds time); + + outcome::result sendBatch(const MapPairStorage &pair_storage_for_send); + + outcome::result getSectorCollateral( + const SectorNumber §or_number, const TipsetKey &tip_set_key); + }; + +} // namespace fc::mining \ No newline at end of file diff --git a/core/miner/storage_fsm/types.hpp b/core/miner/storage_fsm/types.hpp index f10ee8dc04..bfa07675e5 100644 --- a/core/miner/storage_fsm/types.hpp +++ b/core/miner/storage_fsm/types.hpp @@ -64,9 +64,12 @@ namespace fc::mining::types { is_keep_unsealed) inline bool operator==(const DealInfo &lhs, const DealInfo &rhs) { - return lhs.publish_cid == rhs.publish_cid && lhs.deal_id == rhs.deal_id - && lhs.deal_schedule == rhs.deal_schedule - && lhs.is_keep_unsealed == rhs.is_keep_unsealed; + if (lhs.is_keep_unsealed == rhs.is_keep_unsealed) { + return lhs.publish_cid == rhs.publish_cid && lhs.deal_id == rhs.deal_id + && lhs.deal_schedule == rhs.deal_schedule; + } else { + return false; + } } struct Piece { @@ -229,6 +232,7 @@ namespace fc::mining::types { // maxBatchFee = maxBase + maxPerSector * nSectors BatchConfing max_precommit_batch_gas_fee; + BatchConfing max_commit_batch_gas_fee; // TODO (Ruslan Gilvanov): TokenAmount max_terminate_gas_fee, // max_window_poSt_gas_fee, max_publish_deals_fee, diff --git a/core/vm/actor/builtin/v6/monies.hpp b/core/vm/actor/builtin/v6/monies.hpp new file mode 100644 index 0000000000..513f04ab5b --- /dev/null +++ b/core/vm/actor/builtin/v6/monies.hpp @@ -0,0 +1,38 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once +#include "const.hpp" + +namespace fc::vm::actor::builtin::v6::miner { + using primitives::BigInt; + + const BigInt kEstimatedSingleProveCommitGasUsage = 49299973; + const BigInt kEstimatedSinglePreCommitGasUsage = 16433324; + const BigInt kBatchDiscountNumerator = 1; + const BigInt kBatchDiscountDenominator = 20; + const BigInt kBatchBalancer = 5 * kOneNanoFil; + + inline TokenAmount AggregateProveCommitNetworkFee( + uint64_t aggregate_size, const TokenAmount &base_fee) { + const TokenAmount effectiveGasFee = std::max(base_fee, kBatchBalancer); + const TokenAmount networkFeeNum = + effectiveGasFee * kEstimatedSinglePreCommitGasUsage * aggregate_size + * kBatchDiscountNumerator; + return bigdiv(networkFeeNum, kBatchDiscountDenominator); + } + + inline TokenAmount aggregateNetworkFee(uint64_t aggregate_size, + const TokenAmount &gasUsage, + const TokenAmount &baseFee) { + TokenAmount effectiveGasFee = std::max(baseFee, kBatchBalancer); + TokenAmount networkFeeNum = effectiveGasFee * gasUsage + * BigInt(aggregate_size) + * kBatchDiscountNumerator; + TokenAmount networkFee = bigdiv(networkFeeNum, kBatchDiscountDenominator); + return networkFee; + } + +} // namespace fc::vm::actor::builtin::v6::miner \ No newline at end of file diff --git a/test/core/miner/CMakeLists.txt b/test/core/miner/CMakeLists.txt index 71b98efc9b..7d7f5b2c6a 100644 --- a/test/core/miner/CMakeLists.txt +++ b/test/core/miner/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(events_test addtest(batcher_test precommit_batcher_test.cpp + commit_batcher_test.cpp ) target_link_libraries(batcher_test batcher diff --git a/test/core/miner/commit_batcher_test.cpp b/test/core/miner/commit_batcher_test.cpp new file mode 100644 index 0000000000..a1f089c32e --- /dev/null +++ b/test/core/miner/commit_batcher_test.cpp @@ -0,0 +1,258 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include "miner/address_selector.hpp" +#include "miner/storage_fsm/impl/commit_batcher_impl.hpp" +#include "testutil/default_print.hpp" +#include "testutil/literals.hpp" +#include "testutil/mocks/api.hpp" +#include "testutil/mocks/proofs/proof_engine_mock.hpp" +#include "testutil/outcome.hpp" +#include "vm/actor/actor_method.hpp" +#include "vm/actor/builtin/v5/miner/miner_actor.hpp" +#include "vm/actor/builtin/v6/monies.hpp" +#include "vm/message/message.hpp" + +namespace fc::mining { + using primitives::ActorId; + using testing::_; + using vm::actor::builtin::v5::miner::ProveCommitAggregate; + using vm::message::SignedMessage; + using vm::message::UnsignedMessage; + using PairStorage = CommitBatcherImpl::PairStorage; + using MapPairStorage = std::map; + using fc::mining::types::FeeConfig; + using fc::mining::types::PaddedPieceSize; + using fc::mining::types::Piece; + using fc::mining::types::PieceInfo; + using libp2p::basic::ManualSchedulerBackend; + using libp2p::basic::Scheduler; + using libp2p::basic::SchedulerImpl; + using mining::SelectAddress; + using proofs::ProofEngineMock; + using vm::actor::builtin::types::miner::SectorPreCommitOnChainInfo; + using BatcherCallbackMock = + std::function &cid)>; + + MATCHER_P(methodMatcher, method, "compare msg method name") { + return (arg.method == method); + } + + class CommitBatcherTest : public testing::Test { + protected: + void SetUp() override { + scheduler_backend_ = std::make_shared(); + scheduler_ = std::make_shared(scheduler_backend_, + Scheduler::Config{}); + + miner_id_ = 1; + miner_address_ = Address::makeFromId(miner_id_); + side_address_ = Address::makeFromId(++miner_id_); + + api::BlockHeader block; + block.height = 2; + + tipset_ = std::make_shared( + TipsetKey(), std::vector({block})); + + EXPECT_CALL(mock_ChainHead, Call()) + .WillRepeatedly(testing::Return(tipset_)); + + fee_config_ = std::make_shared(); + fee_config_->max_precommit_batch_gas_fee.base = + TokenAmount{"50000000000000000"}; + fee_config_->max_precommit_batch_gas_fee.per_sector = + TokenAmount{"250000000000000"}; + + EXPECT_CALL(mock_StateMinerInitialPledgeCollateral, + Call(miner_address_, _, _)) + .WillRepeatedly(testing::Return(TokenAmount(100))); + + EXPECT_CALL(mock_StateSectorPreCommitInfo, Call(miner_address_, _, _)) + .WillRepeatedly(testing::Return(SectorPreCommitOnChainInfo())); + proof_ = std::make_shared(); + + EXPECT_CALL(mock_StateMinerInfo, Call(_, _)) + .WillRepeatedly(testing::Return(MinerInfo())); + + EXPECT_CALL(mock_ChainGetTipSet, Call(_)) + .WillRepeatedly(testing::Return(tipset_)); + + callback_mock_ = [](const outcome::result &cid) -> void { + EXPECT_TRUE(cid.has_value()); + }; + } + + std::shared_ptr api_ = std::make_shared(); + std::shared_ptr scheduler_; + MapPairStorage pair_storage_; + std::shared_ptr tipset_; + Address miner_address_; + Address side_address_; + Address wrong_side_address_; + ActorId miner_id_; + std::shared_ptr proof_; + std::shared_ptr fee_config_; + BatcherCallbackMock callback_mock_; + std::shared_ptr scheduler_backend_; + + MOCK_API(api_, ChainHead); + MOCK_API(api_, MpoolPushMessage); + MOCK_API(api_, ChainGetTipSet); + MOCK_API(api_, StateMinerInfo); + MOCK_API(api_, StateMinerInitialPledgeCollateral); + MOCK_API(api_, StateSectorPreCommitInfo); + }; + + /** + * @given 2 commits and max_size_callback is 2 + * @when send the 2 commits + * @then the result should be 2 messages in message pool with pair of commits + */ + TEST_F(CommitBatcherTest, SendAfterMaxSize) { + EXPECT_CALL(mock_MpoolPushMessage, + Call(methodMatcher(ProveCommitAggregate::Number), _)) + .WillOnce( + testing::Invoke([&](const UnsignedMessage &msg, + auto &) -> outcome::result { + return SignedMessage{.message = msg, .signature = BlsSignature()}; + })); + + EXPECT_CALL(*proof_, aggregateSealProofs(_, _)) + .WillOnce(testing::Return(outcome::success())); + + std::shared_ptr batcher = + std::make_shared( + std::chrono::seconds(9999), + api_, + miner_address_, + scheduler_, + [=](const MinerInfo &miner_info, + const TokenAmount &good_funds, + const TokenAmount &need_funds, + const std::shared_ptr &api) + -> outcome::result
{ + return SelectAddress(miner_info, good_funds, api); + }, + fee_config_, + 2, + proof_); + + SectorInfo sector_info0 = SectorInfo(); + sector_info0.ticket_epoch = 5; + Piece piece0{.piece = PieceInfo{.size = PaddedPieceSize(128), + .cid = "010001020008"_cid}, + .deal_info = boost::none}; + sector_info0.pieces.push_back(piece0); + sector_info0.sector_number = 777; + + SectorInfo sector_info1 = SectorInfo(); + sector_info0.ticket_epoch = 5; + Piece piece1{.piece = PieceInfo{.size = PaddedPieceSize(128), + .cid = "010001020009"_cid}, + .deal_info = boost::none}; + sector_info1.pieces.push_back(piece1); + sector_info1.sector_number = 888; + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + sector_info0, AggregateInput{.info = 777}, callback_mock_)); + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + sector_info1, AggregateInput{.info = 888}, callback_mock_)); + } + + /** + * @given a 1 commit + * @when send a 1 commit + * @then result should be 0 messages in message pool + */ + TEST_F(CommitBatcherTest, BatcherWrite) { + std::shared_ptr batcher = + std::make_shared( + std::chrono::seconds(9999), + api_, + miner_address_, + scheduler_, + [=](const MinerInfo &miner_info, + const TokenAmount &good_funds, + const TokenAmount &need_funds, + const std::shared_ptr &api) + -> outcome::result
{ + return SelectAddress(miner_info, good_funds, api); + }, + fee_config_, + 999, + proof_); + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + SectorInfo(), + AggregateInput(), + [](const outcome::result &cid) -> outcome::result { + return outcome::success(); + })); + } + + /** + * @given 4 commits + * @when send the first pair and after the rescheduling + * next pair + * @then The result should be 2 messages in message pool with pair of + * commits in each + */ + TEST_F(CommitBatcherTest, CallbackSend) { + std::shared_ptr batcher = + std::make_shared( + std::chrono::seconds(999), + api_, + miner_address_, + scheduler_, + [=](const MinerInfo &miner_info, + const TokenAmount &good_funds, + const TokenAmount &need_funds, + const std::shared_ptr &api) + -> outcome::result
{ + return SelectAddress(miner_info, good_funds, api); + }, + fee_config_, + 999, + proof_); + + EXPECT_CALL(*proof_, aggregateSealProofs(_, _)) + .WillRepeatedly(testing::Return(outcome::success())); + + EXPECT_CALL(mock_MpoolPushMessage, + Call(methodMatcher(ProveCommitAggregate::Number), _)) + .Times(2) + .WillRepeatedly( + testing::Invoke([&](const UnsignedMessage &msg, + auto &) -> outcome::result { + return SignedMessage{.message = msg, .signature = BlsSignature()}; + })); + + SectorInfo sector_info = SectorInfo(); + + sector_info.sector_number = 2; + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + sector_info, AggregateInput{.info = 2}, callback_mock_)); + + scheduler_backend_->shiftToTimer(); + sector_info.sector_number = 3; + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + sector_info, AggregateInput{.info = 3}, callback_mock_)); + + sector_info.sector_number = 6; + + EXPECT_OUTCOME_TRUE_1(batcher->addCommit( + sector_info, AggregateInput{.info = 6}, callback_mock_)); + + scheduler_backend_->shiftToTimer(); + } +} // namespace fc::mining