Skip to content

Commit 23890b9

Browse files
authored
Merge pull request #10 from Asurar0/database_part2
Finishing transition to LMDB (Rayon, Async, Zstd, Dynamic resizing)
2 parents 5df6a4f + cd1b948 commit 23890b9

File tree

4 files changed

+178
-81
lines changed

4 files changed

+178
-81
lines changed

src/database/chunks.rs

Lines changed: 154 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,195 @@
1+
use std::borrow::Cow;
2+
use std::future::Future;
3+
use std::marker::PhantomData;
4+
5+
use bincode::{Decode, Encode, config::standard};
16
use byteorder::LE;
2-
use zstd::bulk::compress as zstd_compress;
3-
use zstd::bulk::decompress as zstd_decompress;
4-
use heed::types::{Bytes, U64};
5-
use heed::Env;
6-
use tokio::task::spawn_blocking;
7-
use tracing::{trace, warn};
8-
9-
use crate::database::Database;
10-
use crate::utils::error::Error;
11-
use crate::utils::hash::hash;
12-
use crate::world::chunk_format::Chunk;
13-
14-
// use crate::utils::binary_utils::{bzip_compress, bzip_decompress};
15-
use bincode::config::standard;
16-
use bincode::{decode_from_slice, encode_to_vec};
7+
use futures::channel::oneshot::{self, Canceled};
8+
use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError};
9+
use tracing::{info, trace, warn};
10+
11+
use crate::{
12+
database::Database,
13+
utils::error::Error,
14+
utils::hash::hash,
15+
world::chunk_format::Chunk
16+
};
17+
18+
use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL};
19+
20+
pub struct Zstd<T>(PhantomData<T>);
21+
22+
impl<'a, T: Encode + 'a> BytesEncode<'a> for Zstd<T> {
23+
type EItem = T;
24+
25+
fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, heed::BoxedError> {
26+
27+
// Compress
28+
let mut bytes = Vec::new();
29+
let mut compressor = zstd::Encoder::new(&mut bytes, 6)?;
30+
bincode::encode_into_std_write(item, &mut compressor, standard())?;
31+
compressor.finish()?;
32+
33+
Ok(Cow::Owned(bytes))
34+
}
35+
}
36+
37+
impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
38+
type DItem = T;
39+
40+
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {
41+
42+
let mut decompressor = zstd::Decoder::new(bytes)?;
43+
let decoded = bincode::decode_from_std_read(&mut decompressor, standard())?;
44+
Ok(decoded)
45+
}
46+
}
47+
48+
/// LMDB will follow a linear growth as opposed to MDBX which
49+
/// uses a geometric growth.
50+
pub(super) fn new_page_size(old_size: usize) -> usize {
51+
old_size + LMDB_PAGE_SIZE_INCREMENT
52+
}
53+
54+
// Will delegate a database operation to the database threadpool
55+
pub(super) fn spawn_blocking_db<F, R>(db: Env, f: F) -> impl Future<Output = Result<Result<R,heed::Error>,Canceled>>
56+
where
57+
F: Fn() -> Result<R,heed::Error> + Send + 'static,
58+
R: Send + 'static + std::fmt::Debug,
59+
{
60+
let (tx,res) = oneshot::channel::<Result<R,heed::Error>>();
61+
62+
let pool = LMDB_THREADPOOL.get().unwrap();
63+
pool.spawn(move || {
64+
65+
let read_lock = LMDB_READER_SYNC.read()
66+
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");
67+
68+
let mut res = f();
69+
if let Err(heed::Error::Mdb(MdbError::MapFull)) = res {
70+
71+
tracing::warn!("Database page is full. Resizing...");
72+
73+
drop(read_lock);
74+
75+
let _resize_guard = LMDB_READER_SYNC.write()
76+
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");
77+
78+
let mut global_size_lock = LMDB_PAGE_SIZE.lock().unwrap();
79+
let old_size = *global_size_lock;
80+
*global_size_lock = new_page_size(old_size);
81+
unsafe { db.resize(*global_size_lock).expect("Unable to resize LMDB environment.") };
82+
83+
tracing::info!("Successfully resized LMDB page from {} MiB to {} MiB", (old_size / 1024usize.pow(2)), (*global_size_lock / 1024usize.pow(2)));
84+
85+
drop(global_size_lock);
86+
drop(_resize_guard);
87+
88+
res = f();
89+
} else {
90+
drop(read_lock)
91+
}
92+
93+
if tx.send(res).is_err() {
94+
tracing::warn!("A database task has been unable to send its result because the receiver at other end have closed.")
95+
}
96+
});
97+
98+
res
99+
}
17100

18101
impl Database {
102+
103+
// Close the database
104+
pub fn close(self) {
105+
let token = self.db.prepare_for_closing();
106+
token.wait();
107+
}
108+
19109
/// Fetch chunk from database
20-
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, Error> {
110+
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, heed::Error> {
21111
// Initialize read transaction and open chunks table
22112
let ro_tx = db.read_txn()?;
23113
let database = db
24-
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
114+
.open_database::<U64<LE>, Zstd<Chunk>>(&ro_tx, Some("chunks"))?
25115
.expect("No table \"chunks\" found. The database should have been initialized");
26116

27117
// Attempt to fetch chunk from table
28-
if let Ok(data) = database.get(&ro_tx, key) {
29-
Ok(data.map(|encoded_chunk| {
30-
// let decompressed =
31-
// bzip_decompress(&encoded_chunk).expect("Failed to decompress chunk");
32-
let decompressed = zstd_decompress(&encoded_chunk, 1024*1024*64).expect("Failed to decompress chunk");
33-
let chunk: (Chunk, usize) = decode_from_slice(&*decompressed, standard())
34-
.expect("Failed to decode chunk from database");
35-
chunk.0
36-
}))
37-
} else {
38-
Err(Error::DatabaseError("Failed to get chunk".into()))
39-
}
118+
database.get(&ro_tx, key)
119+
//.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
40120
}
41121

42122
/// Insert a single chunk into database
43-
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), Error> {
123+
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), heed::Error> {
44124
// Initialize write transaction and open chunks table
45125
let mut rw_tx = db.write_txn()?;
46126
let database = db
47-
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
127+
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
48128
.expect("No table \"chunks\" found. The database should have been initialized");
49129

50-
// Encode chunk
51-
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");
52-
// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
53-
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
130+
// Calculate key
54131
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
55132

56133
// Insert chunk
57-
let res = database.put(&mut rw_tx, &key, &compressed);
58-
rw_tx.commit().map_err(|err| {
59-
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
60-
})?;
61-
62-
if let Err(err) = res {
63-
Err(Error::DatabaseError(format!(
64-
"Failed to insert or update chunk: {err}"
65-
)))
66-
} else {
67-
Ok(())
68-
}
134+
let res = database.put(&mut rw_tx, &key, chunk);
135+
rw_tx.commit()?;
136+
// .map_err(|err| {
137+
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
138+
// })?;
139+
140+
res
141+
// if let Err(err) = res {
142+
// Err(Error::DatabaseError(format!(
143+
// "Failed to insert or update chunk: {err}"
144+
// )))
145+
// } else {
146+
// Ok(())
147+
// }
69148
}
70149

71150
/// Insert multiple chunks into database
72151
/// TODO: Find better name/disambiguation
73-
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), Error> {
152+
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> {
74153
// Initialize write transaction and open chunks table
75154
let mut rw_tx = db.write_txn()?;
76155
let database = db
77-
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
156+
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
78157
.expect("No table \"chunks\" found. The database should have been initialized");
79158

80159
// Update page
81160
for chunk in chunks {
82-
// Encode chunk
83-
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");
84-
85-
// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
86-
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
161+
// Calculate key
87162
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
88163

89164
// Insert chunk
90-
database.put(&mut rw_tx, &key, &compressed).map_err(|err| {
91-
Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
92-
})?;
165+
database.put(&mut rw_tx, &key, chunk)?
166+
// .map_err(|err| {
167+
// Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
168+
// })?;
93169
}
94170

95171
// Commit changes
96-
rw_tx.commit().map_err(|err| {
97-
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
98-
})?;
172+
rw_tx.commit()?;
173+
// .map_err(|err| {
174+
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
175+
// })?;
99176
Ok(())
100177
}
101178

102179
async fn load_into_cache(&self, key: u64) -> Result<(), Error> {
103180
let db = self.db.clone();
181+
let tsk_db = self.db.clone();
104182
let cache = self.cache.clone();
105183

106184
tokio::task::spawn(async move {
185+
107186
// Check cache
108187
if cache.contains_key(&key) {
109188
trace!("Chunk already exists in cache: {:X}", key);
110189
}
111190
// If not in cache then search in database
112191
else if let Ok(chunk) =
113-
spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
192+
spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
114193
.await
115194
.unwrap()
116195
{
@@ -158,7 +237,8 @@ impl Database {
158237
// Insert chunk into persistent database
159238
let chunk = value.clone();
160239
let db = self.db.clone();
161-
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk))
240+
let tsk_db = self.db.clone();
241+
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk))
162242
.await
163243
.unwrap()?;
164244

@@ -195,14 +275,15 @@ impl Database {
195275
) -> Result<Option<Chunk>, Error> {
196276
// Calculate key of this chunk and clone database pointer
197277
let key = hash((dimension, x, z));
278+
let tsk_db = self.db.clone();
198279
let db = self.db.clone();
199280

200281
// First check cache
201282
if self.cache.contains_key(&key) {
202283
Ok(self.cache.get(&key).await)
203284
}
204285
// Attempt to get chunk from persistent database
205-
else if let Some(chunk) = spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
286+
else if let Some(chunk) = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
206287
.await
207288
.unwrap()?
208289
{
@@ -236,14 +317,15 @@ impl Database {
236317
pub async fn chunk_exists(&self, x: i32, z: i32, dimension: String) -> Result<bool, Error> {
237318
// Calculate key and copy database pointer
238319
let key = hash((dimension, x, z));
320+
let tsk_db = self.db.clone();
239321
let db = self.db.clone();
240322

241323
// Check first cache
242324
if self.cache.contains_key(&key) {
243325
Ok(true)
244326
// Else check persistent database and load it into cache
245327
} else {
246-
let res = spawn_blocking(move || Self::get_chunk_from_database(&db, &key)).await?;
328+
let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)).await.unwrap();
247329

248330
// WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not.
249331
// This has been replaced by directly loading the queried chunk into cache
@@ -255,7 +337,7 @@ impl Database {
255337
}
256338
Ok(exist)
257339
}
258-
Err(err) => Err(err),
340+
Err(err) => Err(Error::LmdbError(err)),
259341
}
260342
}
261343
}
@@ -286,7 +368,8 @@ impl Database {
286368
// Insert new chunk state into persistent database
287369
let chunk = value.clone();
288370
let db = self.db.clone();
289-
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk)).await??;
371+
let tsk_db = self.db.clone();
372+
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk)).await.unwrap()?;
290373

291374
// Insert new chunk state into cache
292375
self.cache.insert(key, value).await;
@@ -373,11 +456,12 @@ impl Database {
373456

374457
// Clone database pointer
375458
let db = self.db.clone();
459+
let tsk_db = self.db.clone();
376460

377461
// Calculate all keys
378462
let keys = values
379463
.iter()
380-
.map(|v| hash((v.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos).as_str()), v.x_pos, v.z_pos)))
464+
.map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos)))
381465
.collect::<Vec<u64>>();
382466

383467
// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
@@ -387,11 +471,12 @@ impl Database {
387471
self.cache.insert(key, chunk.clone()).await;
388472
self.load_into_cache(key).await?;
389473
}
390-
474+
391475
// Then insert into persistent database
392-
spawn_blocking(move || Self::insert_chunks_into_database(&db, &values))
476+
spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values))
393477
.await
394478
.unwrap()?;
479+
395480
Ok(())
396481
}
397482
}

0 commit comments

Comments
 (0)