Skip to content

Commit 8096192

Browse files
authored
Fix work id for local workers (#611)
Signed-off-by: ortyomka <[email protected]>
1 parent bb21c6e commit 8096192

File tree

8 files changed

+32
-8
lines changed

8 files changed

+32
-8
lines changed

core/sector_storage/impl/local_worker.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -908,4 +908,8 @@ namespace fc::sector_storage {
908908

909909
return call_id;
910910
}
911+
912+
bool LocalWorker::isLocalWorker() const {
913+
return true;
914+
}
911915
} // namespace fc::sector_storage

core/sector_storage/impl/local_worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ namespace fc::sector_storage {
9797
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
9898
override;
9999

100+
bool isLocalWorker() const override;
101+
100102
private:
101103
template <typename W, typename R>
102104
outcome::result<CallId> asyncCall(const SectorRef &sector,

core/sector_storage/impl/remote_worker.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,8 @@ namespace fc::sector_storage {
249249
AcquireMode mode) {
250250
return api_.Fetch(sector, file_type, path_type, mode);
251251
}
252+
253+
bool RemoteWorker::isLocalWorker() const {
254+
return false;
255+
}
252256
} // namespace fc::sector_storage

core/sector_storage/impl/remote_worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ namespace fc::sector_storage {
9595
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
9696
override;
9797

98+
bool isLocalWorker() const override;
99+
98100
private:
99101
explicit RemoteWorker(io_context &context);
100102

core/sector_storage/impl/scheduler_impl.cpp

+9-6
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,15 @@ namespace fc::sector_storage {
103103
const std::shared_ptr<Worker> &worker)
104104
-> outcome::result<CallId> {
105105
OUTCOME_TRY(call_id, old_job(worker));
106-
WorkState state;
107-
state.id = wid;
108-
state.status = WorkStatus::kInProgress;
109-
state.call_id = call_id;
110-
OUTCOME_TRY(state_raw, codec::cbor::encode(state));
111-
OUTCOME_TRY(kv->put(static_cast<Bytes>(wid), std::move(state_raw)));
106+
// check that the work running on a remote worker
107+
if (not worker->isLocalWorker()) {
108+
WorkState state;
109+
state.id = wid;
110+
state.status = WorkStatus::kInProgress;
111+
state.call_id = call_id;
112+
OUTCOME_TRY(state_raw, codec::cbor::encode(state));
113+
OUTCOME_TRY(kv->put(static_cast<Bytes>(wid), std::move(state_raw)));
114+
}
112115
return std::move(call_id);
113116
};
114117
}

core/sector_storage/worker.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ namespace fc::sector_storage {
142142

143143
virtual outcome::result<std::vector<primitives::StoragePath>>
144144
getAccessiblePaths() = 0;
145+
146+
virtual bool isLocalWorker() const = 0;
145147
};
146148

147149
enum class CallErrorCode : uint64_t {

test/core/sector_storage/scheduler_test.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "sector_storage/schedulder_utils.hpp"
1212
#include "storage/in_memory/in_memory_storage.hpp"
1313
#include "testutil/mocks/sector_storage/selector_mock.hpp"
14+
#include "testutil/mocks/sector_storage/worker_mock.hpp"
1415
#include "testutil/outcome.hpp"
1516

1617
namespace fc::sector_storage {
@@ -72,9 +73,10 @@ namespace fc::sector_storage {
7273
scheduler_ = scheduler;
7374

7475
std::unique_ptr<WorkerHandle> worker = std::make_unique<WorkerHandle>();
76+
mock_worker_ = std::make_unique<WorkerMock>();
7577

7678
worker_name_ = "worker";
77-
79+
worker->worker = mock_worker_;
7880
worker->info = WorkerInfo{
7981
.hostname = worker_name_,
8082
.resources = WorkerResources{.physical_memory = uint64_t(1) << 20,
@@ -89,7 +91,7 @@ namespace fc::sector_storage {
8991

9092
std::string worker_name_;
9193
std::vector<WorkState> states_;
92-
94+
std::shared_ptr<WorkerMock> mock_worker_;
9395
std::shared_ptr<boost::asio::io_context> io_;
9496
RegisteredSealProof seal_proof_type_;
9597
std::shared_ptr<InMemoryStorage> kv_;
@@ -340,6 +342,9 @@ namespace fc::sector_storage {
340342
is_first_called = true;
341343
};
342344

345+
EXPECT_CALL(*mock_worker_, isLocalWorker())
346+
.WillOnce(testing::Return(false));
347+
343348
EXPECT_CALL(
344349
*selector_,
345350
is_satisfying(task, seal_proof_type_, workerNameMatcher(worker_name_)))

test/testutil/mocks/sector_storage/worker_mock.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -100,5 +100,7 @@ namespace fc::sector_storage {
100100
const std::vector<UnpaddedPieceSize> &,
101101
const UnpaddedPieceSize &,
102102
int));
103+
104+
MOCK_CONST_METHOD0(isLocalWorker, bool());
103105
};
104106
} // namespace fc::sector_storage

0 commit comments

Comments
 (0)