Skip to content

Commit 1a1c039

Browse files
authored
buffered car append (#561)
Signed-off-by: turuslan <[email protected]>
1 parent a710b9a commit 1a1c039

File tree

9 files changed

+116
-17
lines changed

9 files changed

+116
-17
lines changed

core/api/full_node/make.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,7 @@ namespace fc::api {
557557
OUTCOME_CB(auto it, find(ts_branch, context.tipset->height()));
558558
OUTCOME_CB(info.prev_beacon, latestBeacon(ts_load, it));
559559
OUTCOME_CB(auto it2, getLookbackTipSetForRound(it, epoch));
560+
OUTCOME_CB(auto lookback_ts, ts_load->lazyLoad(it2.second->second));
560561
OUTCOME_CB(auto cached,
561562
interpreter_cache->get(it2.second->second.key));
562563
ts_lock.unlock();
@@ -570,7 +571,7 @@ namespace fc::api {
570571
[=, FWD(cb), MOVE(context), MOVE(info)](auto _beacons) mutable {
571572
OUTCOME_CB(info.beacons, _beacons);
572573
TipsetContext lookback{
573-
nullptr,
574+
lookback_ts,
574575
{withVersion(ipld, epoch), std::move(cached.state_root)},
575576
{}};
576577
OUTCOME_CB(auto actor, lookback.state_tree.tryGet(miner));

core/node/main/builder.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,17 @@ namespace fc::node {
219219
}
220220
}
221221

222+
while (true) {
223+
const auto it{std::prev(o.ts_main->chain.end())};
224+
if (it == o.ts_main->chain.begin()
225+
|| o.env_context.interpreter_cache->tryGet(it->second.key)) {
226+
break;
227+
}
228+
log()->warn("missing state at {}, reverting", it->first);
229+
o.ts_main->updater->revert();
230+
o.ts_main->chain.erase(it);
231+
}
232+
222233
log()->info("chain loaded");
223234
assert(o.ts_main->bottom().second.key == genesis_tsk);
224235
}
@@ -231,6 +242,7 @@ namespace fc::node {
231242
car_path, true, 1 << 30, o.ipld, log());
232243
// estimated
233244
ipld->flush_on = 200000;
245+
ipld->car_flush_on = 100;
234246
o.ipld_flush_thread = std::make_shared<IoThread>();
235247
ipld->io = o.ipld_flush_thread->io;
236248
return ipld;
@@ -452,6 +464,10 @@ namespace fc::node {
452464
o.io_context = injector.create<std::shared_ptr<boost::asio::io_context>>();
453465
o.scheduler = injector.create<std::shared_ptr<Scheduler>>();
454466

467+
timerLoop(o.scheduler, std::chrono::minutes{1}, [ipld{o.compacter}] {
468+
ipld->carFlush();
469+
});
470+
455471
o.events = std::make_shared<sync::events::Events>(o.io_context);
456472

457473
o.host = injector.create<std::shared_ptr<libp2p::Host>>();

core/storage/car/cids_index/util.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ namespace fc::storage::cids_index {
158158
_ipld->index = index;
159159
_ipld->ipld = ipld;
160160
if (writable) {
161-
_ipld->writable.open(car_path, std::ios::app | std::ios::binary);
161+
_ipld->writable = {fopen(car_path.c_str(), "ab"), fclose};
162162
}
163163
_ipld->car_offset = car_size;
164164
_ipld->car_path = car_path;

core/storage/compacter/compacter.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ namespace fc::storage::compacter {
4949
}
5050
}
5151

52+
void CompacterIpld::carFlush() {
53+
std::shared_lock lock{ipld_mutex};
54+
(use_new_ipld ? new_ipld : old_ipld)->carFlush();
55+
}
56+
5257
void CompacterIpld::open() {
5358
if (start_head_key.has()) {
5459
resume();
@@ -82,6 +87,7 @@ namespace fc::storage::compacter {
8287
new_ipld = *car;
8388
new_ipld->io = old_ipld->io;
8489
new_ipld->flush_on = old_ipld->flush_on;
90+
new_ipld->car_flush_on = old_ipld->car_flush_on;
8591
queue->visited = new_ipld;
8692
queue->open(true);
8793
std::unique_lock vm_lock{*interpreter->mutex};
@@ -98,6 +104,7 @@ namespace fc::storage::compacter {
98104
start_head_key.setCbor(start_head->key.cids());
99105
{
100106
std::unique_lock ipld_lock{ipld_mutex};
107+
old_ipld->carFlush();
101108
use_new_ipld = true;
102109
}
103110
vm_lock.unlock();
@@ -118,6 +125,7 @@ namespace fc::storage::compacter {
118125
new_ipld = *car;
119126
new_ipld->io = old_ipld->io;
120127
new_ipld->flush_on = old_ipld->flush_on;
128+
new_ipld->car_flush_on = old_ipld->car_flush_on;
121129
queue->visited = new_ipld;
122130
queue->open(false);
123131
start_head =
@@ -265,6 +273,7 @@ namespace fc::storage::compacter {
265273
std::unique_lock old_flush_lock{old_ipld->flush_mutex};
266274
std::unique_lock new_flush_lock{new_ipld->flush_mutex};
267275
std::unique_lock ipld_lock{ipld_mutex};
276+
new_ipld->carFlush();
268277
// keep last car copy for debug
269278
boost::filesystem::rename(old_ipld->car_path,
270279
old_ipld->car_path + ".old_ipld");

core/storage/compacter/compacter.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ namespace fc::storage::compacter {
4545
bool get(const CbCid &key, Bytes *value) const override;
4646
void put(const CbCid &key, BytesCow &&value) override;
4747

48+
void carFlush();
49+
4850
void open();
4951
bool asyncStart();
5052
void doStart();

core/storage/ipld/cids_ipld.cpp

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <boost/filesystem/operations.hpp>
1010

1111
#include "cbor_blake/ipld_any.hpp"
12+
#include "codec/cbor/light_reader/cid.hpp"
1213
#include "codec/uvarint.hpp"
1314
#include "common/error_text.hpp"
1415
#include "common/logger.hpp"
@@ -20,7 +21,7 @@ namespace fc::storage::ipld {
2021
using cids_index::MergeRange;
2122

2223
boost::optional<Row> CidsIpld::findWritten(const CbCid &key) const {
23-
assert(writable.is_open());
24+
assert(writable != nullptr);
2425
auto it{written.lower_bound(Row{key, {}, {}})};
2526
if (it != written.end() && it->key == key) {
2627
return *it;
@@ -42,7 +43,7 @@ namespace fc::storage::ipld {
4243

4344
outcome::result<void> CidsIpld::set(const CID &cid, BytesCow &&value) {
4445
if (auto key{asBlake(cid)}) {
45-
if (writable.is_open()) {
46+
if (writable != nullptr) {
4647
put(*key, std::move(value));
4748
return outcome::success();
4849
}
@@ -128,7 +129,7 @@ namespace fc::storage::ipld {
128129
std::shared_lock index_lock{index_mutex};
129130
auto row{index->find(key).value()};
130131
index_lock.unlock();
131-
if (!row && writable.is_open()) {
132+
if (!row && writable != nullptr) {
132133
std::shared_lock written_lock{written_mutex};
133134
row = findWritten(key);
134135
}
@@ -139,6 +140,9 @@ namespace fc::storage::ipld {
139140
return false;
140141
}
141142
if (value != nullptr) {
143+
if (carGet(*row, *value)) {
144+
return true;
145+
}
142146
std::unique_lock car_lock{car_mutex};
143147
auto [good, size]{readCarItem(car_file, *row, nullptr)};
144148
if (!good) {
@@ -155,7 +159,7 @@ namespace fc::storage::ipld {
155159
}
156160

157161
void CidsIpld::put(const CbCid &key, BytesCow &&value) {
158-
if (!writable.is_open()) {
162+
if (writable == nullptr) {
159163
outcome::raise(ERROR_TEXT("CidsIpld.put: not writable"));
160164
}
161165
if (has(key)) {
@@ -179,22 +183,64 @@ namespace fc::storage::ipld {
179183
row.key = key;
180184
row.offset = car_offset;
181185
row.max_size64 = maxSize64(item.size());
182-
if (!common::write(writable, item)) {
183-
spdlog::error("CidsIpld.put write error");
184-
outcome::raise(ERROR_TEXT("CidsIpld.put: write error"));
185-
}
186-
if (!writable.flush().good()) {
187-
spdlog::error("CidsIpld.put flush error");
188-
outcome::raise(ERROR_TEXT("CidsIpld.put: flush error"));
189-
}
190186
car_offset += item.size();
187+
carPut(row, std::move(item));
191188
written.insert(row);
192189
if (flush_on != 0 && written.size() >= flush_on) {
193190
written_lock.unlock();
194191
asyncFlush();
195192
}
196193
}
197194

195+
void CidsIpld::carPut(const Row &row, Bytes &&item) {
196+
std::unique_lock lock{car_flush_mutex};
197+
car_queue.emplace(row.offset.value(), car_queue_buffer.size());
198+
append(car_queue_buffer, item);
199+
if (car_queue.size() >= car_flush_on) {
200+
carFlush(std::adopt_lock);
201+
}
202+
}
203+
204+
bool CidsIpld::carGet(const Row &row, Bytes &value) const {
205+
std::shared_lock lock{car_flush_mutex};
206+
const auto it{car_queue.find(row.offset.value())};
207+
if (it == car_queue.end()) {
208+
return false;
209+
}
210+
auto item{BytesIn{car_queue_buffer}.subspan(it->second)};
211+
BytesIn input;
212+
const CbCid *hash;
213+
if (codec::uvarint::readBytes(input, item)
214+
&& codec::cbor::light_reader::readCborBlake(hash, input)) {
215+
copy(value, input);
216+
return true;
217+
}
218+
outcome::raise(ERROR_TEXT("CidsIpld.carGet decode error"));
219+
}
220+
221+
void CidsIpld::carFlush(std::adopt_lock_t) {
222+
if (car_queue.empty()) {
223+
return;
224+
}
225+
if (fwrite(
226+
car_queue_buffer.data(), car_queue_buffer.size(), 1, writable.get())
227+
!= 1) {
228+
spdlog::error("CidsIpld.carFlush write error");
229+
outcome::raise(ERROR_TEXT("CidsIpld.carFlush: write error"));
230+
}
231+
if (fflush(writable.get()) != 0) {
232+
spdlog::error("CidsIpld.carFlush flush error");
233+
outcome::raise(ERROR_TEXT("CidsIpld.carFlush: flush error"));
234+
}
235+
car_queue.clear();
236+
car_queue_buffer.resize(0);
237+
}
238+
239+
void CidsIpld::carFlush() {
240+
std::unique_lock lock{car_flush_mutex};
241+
carFlush(std::adopt_lock);
242+
}
243+
198244
void CidsIpld::asyncFlush() {
199245
if (!flushing.test_and_set()) {
200246
if (io) {

core/storage/ipld/cids_ipld.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ namespace fc::storage::ipld {
3131
bool get(const CbCid &key, Bytes *value) const override;
3232
void put(const CbCid &key, BytesCow &&value) override;
3333

34+
void carPut(const Row &row, Bytes &&item);
35+
bool carGet(const Row &row, Bytes &value) const;
36+
void carFlush(std::adopt_lock_t);
37+
void carFlush();
38+
3439
void asyncFlush();
3540

3641
inline boost::optional<Row> findWritten(const CbCid &key) const;
@@ -41,7 +46,7 @@ namespace fc::storage::ipld {
4146
mutable std::shared_mutex index_mutex;
4247
std::shared_ptr<Index> index;
4348
IpldPtr ipld;
44-
std::ofstream writable;
49+
std::shared_ptr<FILE> writable;
4550
mutable std::shared_mutex written_mutex;
4651
std::set<Row> written;
4752
uint64_t car_offset{};
@@ -52,5 +57,9 @@ namespace fc::storage::ipld {
5257
std::string index_path;
5358
std::string car_path;
5459
boost::optional<size_t> max_memory;
60+
mutable std::shared_mutex car_flush_mutex;
61+
std::map<uint64_t, size_t> car_queue;
62+
Bytes car_queue_buffer;
63+
size_t car_flush_on{};
5564
};
5665
} // namespace fc::storage::ipld

core/storage/mpool/mpool.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -794,8 +794,9 @@ namespace fc::storage::mpool {
794794
remove(msg->from, msg->nonce);
795795
} else {
796796
if (bls) {
797-
std::lock_guard bls_cache_lock{bls_cache_mutex_};
797+
std::unique_lock bls_cache_lock{bls_cache_mutex_};
798798
if (auto sig{bls_cache.get(cid)}) {
799+
bls_cache_lock.unlock();
799800
OUTCOME_TRY(add({*msg, *sig}));
800801
}
801802
} else {

test/core/storage/car/cids_index_test.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ namespace fc::storage::cids_index {
6767
// inserted only once
6868
auto car_value1{*common::readFile(car_path)};
6969
EXPECT_OUTCOME_TRUE_1(setCbor(ipld, value1));
70-
ipld->writable.flush();
70+
fflush(ipld->writable.get());
7171
EXPECT_EQ(fs::file_size(car_path), car_value1.size());
7272

7373
// truncated car drops index
@@ -133,4 +133,19 @@ namespace fc::storage::cids_index {
133133
IoThread thread;
134134
testFlush(thread.io);
135135
}
136+
137+
TEST_F(CidsIndexTest, CarFlush) {
138+
ipld = *load(true);
139+
const auto header{*common::readFile(car_path)};
140+
ipld->car_flush_on = 2;
141+
142+
const auto c1{setCbor(ipld, 1).value()};
143+
EXPECT_EQ(fs::file_size(car_path), header.size());
144+
EXPECT_OUTCOME_EQ(getCbor<int>(ipld, c1), 1);
145+
146+
const auto c2{setCbor(ipld, 2).value()};
147+
EXPECT_EQ(fs::file_size(car_path), header.size() + 2 * 40);
148+
EXPECT_OUTCOME_EQ(getCbor<int>(ipld, c1), 1);
149+
EXPECT_OUTCOME_EQ(getCbor<int>(ipld, c2), 2);
150+
}
136151
} // namespace fc::storage::cids_index

0 commit comments

Comments
 (0)