Skip to content

Commit cd1b948

Browse files
committed
Fix Zstd + Implement dynamic page resizing
1 parent b6fb310 commit cd1b948

File tree

3 files changed

+93
-43
lines changed

3 files changed

+93
-43
lines changed

src/database/chunks.rs

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::marker::PhantomData;
55
use bincode::{Decode, Encode, config::standard};
66
use byteorder::LE;
77
use futures::channel::oneshot::{self, Canceled};
8-
use heed::{BytesDecode, BytesEncode, types::U64, Env};
9-
use tracing::{trace, warn};
8+
use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError};
9+
use tracing::{info, trace, warn};
1010

1111
use crate::{
1212
database::Database,
@@ -15,7 +15,7 @@ use crate::{
1515
world::chunk_format::Chunk
1616
};
1717

18-
use super::LMDB_THREADPOOL;
18+
use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL};
1919

2020
pub struct Zstd<T>(PhantomData<T>);
2121

@@ -28,6 +28,7 @@ impl<'a, T: Encode + 'a> BytesEncode<'a> for Zstd<T> {
2828
let mut bytes = Vec::new();
2929
let mut compressor = zstd::Encoder::new(&mut bytes, 6)?;
3030
bincode::encode_into_std_write(item, &mut compressor, standard())?;
31+
compressor.finish()?;
3132

3233
Ok(Cow::Owned(bytes))
3334
}
@@ -44,17 +45,52 @@ impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
4445
}
4546
}
4647

48+
/// LMDB will follow a linear growth as opposed to MDBX which
49+
/// uses a geometric growth.
50+
pub(super) fn new_page_size(old_size: usize) -> usize {
51+
old_size + LMDB_PAGE_SIZE_INCREMENT
52+
}
53+
4754
// Will delegate a database operation to the database threadpool
48-
pub(super) fn spawn_blocking_db<F, R>(f: F) -> impl Future<Output = Result<R,Canceled>>
55+
pub(super) fn spawn_blocking_db<F, R>(db: Env, f: F) -> impl Future<Output = Result<Result<R,heed::Error>,Canceled>>
4956
where
50-
F: FnOnce() -> R + Send + 'static,
51-
R: Send + 'static,
57+
F: Fn() -> Result<R,heed::Error> + Send + 'static,
58+
R: Send + 'static + std::fmt::Debug,
5259
{
53-
let (tx,res) = oneshot::channel::<R>();
60+
let (tx,res) = oneshot::channel::<Result<R,heed::Error>>();
5461

5562
let pool = LMDB_THREADPOOL.get().unwrap();
5663
pool.spawn(move || {
57-
if tx.send(f()).is_err() {
64+
65+
let read_lock = LMDB_READER_SYNC.read()
66+
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");
67+
68+
let mut res = f();
69+
if let Err(heed::Error::Mdb(MdbError::MapFull)) = res {
70+
71+
tracing::warn!("Database page is full. Resizing...");
72+
73+
drop(read_lock);
74+
75+
let _resize_guard = LMDB_READER_SYNC.write()
76+
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");
77+
78+
let mut global_size_lock = LMDB_PAGE_SIZE.lock().unwrap();
79+
let old_size = *global_size_lock;
80+
*global_size_lock = new_page_size(old_size);
81+
unsafe { db.resize(*global_size_lock).expect("Unable to resize LMDB environment.") };
82+
83+
tracing::info!("Successfully resized LMDB page from {} MiB to {} MiB", (old_size / 1024usize.pow(2)), (*global_size_lock / 1024usize.pow(2)));
84+
85+
drop(global_size_lock);
86+
drop(_resize_guard);
87+
88+
res = f();
89+
} else {
90+
drop(read_lock)
91+
}
92+
93+
if tx.send(res).is_err() {
5894
tracing::warn!("A database task has been unable to send its result because the receiver at other end have closed.")
5995
}
6096
});
@@ -71,7 +107,7 @@ impl Database {
71107
}
72108

73109
/// Fetch chunk from database
74-
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, Error> {
110+
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, heed::Error> {
75111
// Initialize read transaction and open chunks table
76112
let ro_tx = db.read_txn()?;
77113
let database = db
@@ -80,11 +116,11 @@ impl Database {
80116

81117
// Attempt to fetch chunk from table
82118
database.get(&ro_tx, key)
83-
.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
119+
//.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
84120
}
85121

86122
/// Insert a single chunk into database
87-
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), Error> {
123+
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), heed::Error> {
88124
// Initialize write transaction and open chunks table
89125
let mut rw_tx = db.write_txn()?;
90126
let database = db
@@ -96,22 +132,24 @@ impl Database {
96132

97133
// Insert chunk
98134
let res = database.put(&mut rw_tx, &key, chunk);
99-
rw_tx.commit().map_err(|err| {
100-
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
101-
})?;
102-
103-
if let Err(err) = res {
104-
Err(Error::DatabaseError(format!(
105-
"Failed to insert or update chunk: {err}"
106-
)))
107-
} else {
108-
Ok(())
109-
}
135+
rw_tx.commit()?;
136+
// .map_err(|err| {
137+
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
138+
// })?;
139+
140+
res
141+
// if let Err(err) = res {
142+
// Err(Error::DatabaseError(format!(
143+
// "Failed to insert or update chunk: {err}"
144+
// )))
145+
// } else {
146+
// Ok(())
147+
// }
110148
}
111149

112150
/// Insert multiple chunks into database
113151
/// TODO: Find better name/disambiguation
114-
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), Error> {
152+
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> {
115153
// Initialize write transaction and open chunks table
116154
let mut rw_tx = db.write_txn()?;
117155
let database = db
@@ -124,20 +162,23 @@ impl Database {
124162
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
125163

126164
// Insert chunk
127-
database.put(&mut rw_tx, &key, chunk).map_err(|err| {
128-
Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
129-
})?;
165+
database.put(&mut rw_tx, &key, chunk)?
166+
// .map_err(|err| {
167+
// Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
168+
// })?;
130169
}
131170

132171
// Commit changes
133-
rw_tx.commit().map_err(|err| {
134-
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
135-
})?;
172+
rw_tx.commit()?;
173+
// .map_err(|err| {
174+
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
175+
// })?;
136176
Ok(())
137177
}
138178

139179
async fn load_into_cache(&self, key: u64) -> Result<(), Error> {
140180
let db = self.db.clone();
181+
let tsk_db = self.db.clone();
141182
let cache = self.cache.clone();
142183

143184
tokio::task::spawn(async move {
@@ -148,7 +189,7 @@ impl Database {
148189
}
149190
// If not in cache then search in database
150191
else if let Ok(chunk) =
151-
spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key))
192+
spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
152193
.await
153194
.unwrap()
154195
{
@@ -196,7 +237,8 @@ impl Database {
196237
// Insert chunk into persistent database
197238
let chunk = value.clone();
198239
let db = self.db.clone();
199-
spawn_blocking_db(move || Self::insert_chunk_into_database(&db, &chunk))
240+
let tsk_db = self.db.clone();
241+
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk))
200242
.await
201243
.unwrap()?;
202244

@@ -233,14 +275,15 @@ impl Database {
233275
) -> Result<Option<Chunk>, Error> {
234276
// Calculate key of this chunk and clone database pointer
235277
let key = hash((dimension, x, z));
278+
let tsk_db = self.db.clone();
236279
let db = self.db.clone();
237280

238281
// First check cache
239282
if self.cache.contains_key(&key) {
240283
Ok(self.cache.get(&key).await)
241284
}
242285
// Attempt to get chunk from persistent database
243-
else if let Some(chunk) = spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key))
286+
else if let Some(chunk) = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
244287
.await
245288
.unwrap()?
246289
{
@@ -274,14 +317,15 @@ impl Database {
274317
pub async fn chunk_exists(&self, x: i32, z: i32, dimension: String) -> Result<bool, Error> {
275318
// Calculate key and copy database pointer
276319
let key = hash((dimension, x, z));
320+
let tsk_db = self.db.clone();
277321
let db = self.db.clone();
278322

279323
// Check first cache
280324
if self.cache.contains_key(&key) {
281325
Ok(true)
282326
// Else check persistent database and load it into cache
283327
} else {
284-
let res = spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key)).await.unwrap();
328+
let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)).await.unwrap();
285329

286330
// WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not.
287331
// This has been replaced by directly loading the queried chunk into cache
@@ -293,7 +337,7 @@ impl Database {
293337
}
294338
Ok(exist)
295339
}
296-
Err(err) => Err(err),
340+
Err(err) => Err(Error::LmdbError(err)),
297341
}
298342
}
299343
}
@@ -324,7 +368,8 @@ impl Database {
324368
// Insert new chunk state into persistent database
325369
let chunk = value.clone();
326370
let db = self.db.clone();
327-
spawn_blocking_db(move || Self::insert_chunk_into_database(&db, &chunk)).await.unwrap()?;
371+
let tsk_db = self.db.clone();
372+
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk)).await.unwrap()?;
328373

329374
// Insert new chunk state into cache
330375
self.cache.insert(key, value).await;
@@ -411,11 +456,12 @@ impl Database {
411456

412457
// Clone database pointer
413458
let db = self.db.clone();
459+
let tsk_db = self.db.clone();
414460

415461
// Calculate all keys
416462
let keys = values
417463
.iter()
418-
.map(|v| hash((v.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos).as_str()), v.x_pos, v.z_pos)))
464+
.map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos)))
419465
.collect::<Vec<u64>>();
420466

421467
// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
@@ -425,11 +471,12 @@ impl Database {
425471
self.cache.insert(key, chunk.clone()).await;
426472
self.load_into_cache(key).await?;
427473
}
428-
474+
429475
// Then insert into persistent database
430-
spawn_blocking_db(move || Self::insert_chunks_into_database(&db, &values))
476+
spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values))
431477
.await
432478
.unwrap()?;
479+
433480
Ok(())
434481
}
435482
}

src/database/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use moka::notification::{ListenerFuture, RemovalCause};
88
use rayon::{ThreadPool, ThreadPoolBuilder};
99
use std::env;
1010
use std::path::PathBuf;
11-
use std::sync::{Arc, OnceLock};
11+
use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock};
1212
use std::time::Duration;
1313
use tokio::fs;
1414
use tracing::{debug, info, trace};
@@ -20,12 +20,17 @@ use crate::world::chunk_format::Chunk;
2020
pub mod chunks;
2121

2222
// MDBX constants
23-
const LMDB_PAGE_SIZE: usize = 50 * 1024usize.pow(3); // 50GiB
23+
const LMDB_MIN_PAGE_SIZE: usize = 2 * 1024usize.pow(2); // 100MiB
24+
const LMDB_PAGE_SIZE_INCREMENT: usize = 50*1024usize.pow(2); // 200MiB
2425
const LMDB_MAX_DBS: u32 = 10;
2526

2627
// Database threadpool
2728
static LMDB_THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();
2829

30+
// Global size
31+
static LMDB_PAGE_SIZE: LazyLock<Arc<Mutex<usize>>> = LazyLock::new(|| Arc::new(Mutex::new(LMDB_MIN_PAGE_SIZE)));
32+
static LMDB_READER_SYNC: LazyLock<Arc<RwLock<()>>> = LazyLock::new(|| Arc::new(RwLock::new(())));
33+
2934
/// Global database structure
3035
///
3136
/// Internally contain a handle to the persistent database and a
@@ -75,7 +80,7 @@ pub async fn start_database() -> Result<Database, Error> {
7580
// Database Options
7681
let mut opts = EnvOpenOptions::new();
7782
opts.max_readers(num_cpus::get() as u32)
78-
.map_size(LMDB_PAGE_SIZE)
83+
.map_size(LMDB_MIN_PAGE_SIZE)
7984
.max_dbs(LMDB_MAX_DBS);
8085

8186
// Open database (This operation is safe as we assume no other process touched the database)

src/utils/error.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,3 @@ impl From<Error> for std::io::ErrorKind {
115115
std::io::ErrorKind::Other
116116
}
117117
}
118-
119-

0 commit comments

Comments
 (0)