Skip to content

Commit b6fb310

Browse files
committed
Refactored compression and implemented LMDB over rayon
- Implemented Zstd compression over a Zstd<T> structure which implements heed BytesEncode and BytesDecode trait - Implemented a dedicated rayon threadpool (LMDB_THREADPOOL) for LMDB to operate. - Cleaned up imports a little - Replaced tokio::task::spawn_blocking by spawn_blocking_db function which uses the rayon threadpool - Implemented a close function to the database - Replaces all chunks table open with Zstd<Chunk> instead of Bytes
1 parent 5df6a4f commit b6fb310

File tree

3 files changed

+99
-52
lines changed

3 files changed

+99
-52
lines changed

src/database/chunks.rs

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,101 @@
1+
use std::borrow::Cow;
2+
use std::future::Future;
3+
use std::marker::PhantomData;
4+
5+
use bincode::{Decode, Encode, config::standard};
16
use byteorder::LE;
2-
use zstd::bulk::compress as zstd_compress;
3-
use zstd::bulk::decompress as zstd_decompress;
4-
use heed::types::{Bytes, U64};
5-
use heed::Env;
6-
use tokio::task::spawn_blocking;
7+
use futures::channel::oneshot::{self, Canceled};
8+
use heed::{BytesDecode, BytesEncode, types::U64, Env};
79
use tracing::{trace, warn};
810

9-
use crate::database::Database;
10-
use crate::utils::error::Error;
11-
use crate::utils::hash::hash;
12-
use crate::world::chunk_format::Chunk;
11+
use crate::{
12+
database::Database,
13+
utils::error::Error,
14+
utils::hash::hash,
15+
world::chunk_format::Chunk
16+
};
17+
18+
use super::LMDB_THREADPOOL;
19+
20+
pub struct Zstd<T>(PhantomData<T>);
1321

14-
// use crate::utils::binary_utils::{bzip_compress, bzip_decompress};
15-
use bincode::config::standard;
16-
use bincode::{decode_from_slice, encode_to_vec};
22+
impl<'a, T: Encode + 'a> BytesEncode<'a> for Zstd<T> {
23+
type EItem = T;
24+
25+
fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, heed::BoxedError> {
26+
27+
// Compress
28+
let mut bytes = Vec::new();
29+
let mut compressor = zstd::Encoder::new(&mut bytes, 6)?;
30+
bincode::encode_into_std_write(item, &mut compressor, standard())?;
31+
32+
Ok(Cow::Owned(bytes))
33+
}
34+
}
35+
36+
impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
37+
type DItem = T;
38+
39+
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {
40+
41+
let mut decompressor = zstd::Decoder::new(bytes)?;
42+
let decoded = bincode::decode_from_std_read(&mut decompressor, standard())?;
43+
Ok(decoded)
44+
}
45+
}
46+
47+
// Will delegate a database operation to the database threadpool
48+
pub(super) fn spawn_blocking_db<F, R>(f: F) -> impl Future<Output = Result<R,Canceled>>
49+
where
50+
F: FnOnce() -> R + Send + 'static,
51+
R: Send + 'static,
52+
{
53+
let (tx,res) = oneshot::channel::<R>();
54+
55+
let pool = LMDB_THREADPOOL.get().unwrap();
56+
pool.spawn(move || {
57+
if tx.send(f()).is_err() {
58+
tracing::warn!("A database task has been unable to send its result because the receiver at other end have closed.")
59+
}
60+
});
61+
62+
res
63+
}
1764

1865
impl Database {
66+
67+
// Close the database
68+
pub fn close(self) {
69+
let token = self.db.prepare_for_closing();
70+
token.wait();
71+
}
72+
1973
/// Fetch chunk from database
2074
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, Error> {
2175
// Initialize read transaction and open chunks table
2276
let ro_tx = db.read_txn()?;
2377
let database = db
24-
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
78+
.open_database::<U64<LE>, Zstd<Chunk>>(&ro_tx, Some("chunks"))?
2579
.expect("No table \"chunks\" found. The database should have been initialized");
2680

2781
// Attempt to fetch chunk from table
28-
if let Ok(data) = database.get(&ro_tx, key) {
29-
Ok(data.map(|encoded_chunk| {
30-
// let decompressed =
31-
// bzip_decompress(&encoded_chunk).expect("Failed to decompress chunk");
32-
let decompressed = zstd_decompress(&encoded_chunk, 1024*1024*64).expect("Failed to decompress chunk");
33-
let chunk: (Chunk, usize) = decode_from_slice(&*decompressed, standard())
34-
.expect("Failed to decode chunk from database");
35-
chunk.0
36-
}))
37-
} else {
38-
Err(Error::DatabaseError("Failed to get chunk".into()))
39-
}
82+
database.get(&ro_tx, key)
83+
.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
4084
}
4185

4286
/// Insert a single chunk into database
4387
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), Error> {
4488
// Initialize write transaction and open chunks table
4589
let mut rw_tx = db.write_txn()?;
4690
let database = db
47-
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
91+
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
4892
.expect("No table \"chunks\" found. The database should have been initialized");
4993

50-
// Encode chunk
51-
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");
52-
// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
53-
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
94+
// Calculate key
5495
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
5596

5697
// Insert chunk
57-
let res = database.put(&mut rw_tx, &key, &compressed);
98+
let res = database.put(&mut rw_tx, &key, chunk);
5899
rw_tx.commit().map_err(|err| {
59100
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
60101
})?;
@@ -74,20 +115,16 @@ impl Database {
74115
// Initialize write transaction and open chunks table
75116
let mut rw_tx = db.write_txn()?;
76117
let database = db
77-
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
118+
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
78119
.expect("No table \"chunks\" found. The database should have been initialized");
79120

80121
// Update page
81122
for chunk in chunks {
82-
// Encode chunk
83-
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");
84-
85-
// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
86-
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
123+
// Calculate key
87124
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
88125

89126
// Insert chunk
90-
database.put(&mut rw_tx, &key, &compressed).map_err(|err| {
127+
database.put(&mut rw_tx, &key, chunk).map_err(|err| {
91128
Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
92129
})?;
93130
}
@@ -104,13 +141,14 @@ impl Database {
104141
let cache = self.cache.clone();
105142

106143
tokio::task::spawn(async move {
144+
107145
// Check cache
108146
if cache.contains_key(&key) {
109147
trace!("Chunk already exists in cache: {:X}", key);
110148
}
111149
// If not in cache then search in database
112150
else if let Ok(chunk) =
113-
spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
151+
spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key))
114152
.await
115153
.unwrap()
116154
{
@@ -158,7 +196,7 @@ impl Database {
158196
// Insert chunk into persistent database
159197
let chunk = value.clone();
160198
let db = self.db.clone();
161-
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk))
199+
spawn_blocking_db(move || Self::insert_chunk_into_database(&db, &chunk))
162200
.await
163201
.unwrap()?;
164202

@@ -202,7 +240,7 @@ impl Database {
202240
Ok(self.cache.get(&key).await)
203241
}
204242
// Attempt to get chunk from persistent database
205-
else if let Some(chunk) = spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
243+
else if let Some(chunk) = spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key))
206244
.await
207245
.unwrap()?
208246
{
@@ -243,7 +281,7 @@ impl Database {
243281
Ok(true)
244282
// Else check persistent database and load it into cache
245283
} else {
246-
let res = spawn_blocking(move || Self::get_chunk_from_database(&db, &key)).await?;
284+
let res = spawn_blocking_db(move || Self::get_chunk_from_database(&db, &key)).await.unwrap();
247285

248286
// WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not.
249287
// This has been replaced by directly loading the queried chunk into cache
@@ -286,7 +324,7 @@ impl Database {
286324
// Insert new chunk state into persistent database
287325
let chunk = value.clone();
288326
let db = self.db.clone();
289-
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk)).await??;
327+
spawn_blocking_db(move || Self::insert_chunk_into_database(&db, &chunk)).await.unwrap()?;
290328

291329
// Insert new chunk state into cache
292330
self.cache.insert(key, value).await;
@@ -389,7 +427,7 @@ impl Database {
389427
}
390428

391429
// Then insert into persistent database
392-
spawn_blocking(move || Self::insert_chunks_into_database(&db, &values))
430+
spawn_blocking_db(move || Self::insert_chunks_into_database(&db, &values))
393431
.await
394432
.unwrap()?;
395433
Ok(())

src/database/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use byteorder::LE;
2+
use chunks::Zstd;
23
use deepsize::DeepSizeOf;
34
use futures::FutureExt;
4-
use heed::types::{Bytes, U64};
5+
use heed::types::U64;
56
use heed::{Env as LMDBDatabase, EnvFlags, EnvOpenOptions};
67
use moka::notification::{ListenerFuture, RemovalCause};
8+
use rayon::{ThreadPool, ThreadPoolBuilder};
79
use std::env;
810
use std::path::PathBuf;
9-
use std::sync::Arc;
11+
use std::sync::{Arc, OnceLock};
1012
use std::time::Duration;
1113
use tokio::fs;
1214
use tracing::{debug, info, trace};
@@ -19,6 +21,10 @@ pub mod chunks;
1921

2022
// MDBX constants
2123
const LMDB_PAGE_SIZE: usize = 50 * 1024usize.pow(3); // 50GiB
24+
const LMDB_MAX_DBS: u32 = 10;
25+
26+
// Database threadpool
27+
static LMDB_THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();
2228

2329
/// Global database structure
2430
///
@@ -70,7 +76,7 @@ pub async fn start_database() -> Result<Database, Error> {
7076
let mut opts = EnvOpenOptions::new();
7177
opts.max_readers(num_cpus::get() as u32)
7278
.map_size(LMDB_PAGE_SIZE)
73-
.max_dbs(num_cpus::get() as u32 *2);
79+
.max_dbs(LMDB_MAX_DBS);
7480

7581
// Open database (This operation is safe as we assume no other process touched the database)
7682
let lmdb = unsafe {
@@ -79,14 +85,19 @@ pub async fn start_database() -> Result<Database, Error> {
7985
.expect("Unable to open LMDB environment located at {world_path:?}")
8086
};
8187

88+
// Start database threadpool
89+
LMDB_THREADPOOL.get_or_init(|| {
90+
ThreadPoolBuilder::new().num_threads(num_cpus::get() / 2).build().unwrap()
91+
});
92+
8293
// Check if database is built. Otherwise, initialize it
8394
let mut rw_tx = lmdb.write_txn().unwrap();
8495
if lmdb
85-
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))
96+
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))
8697
.unwrap()
8798
.is_none()
8899
{
89-
lmdb.create_database::<U64<LE>, Bytes>(&mut rw_tx, Some("chunks"))
100+
lmdb.create_database::<U64<LE>, Zstd<Chunk>>(&mut rw_tx, Some("chunks"))
90101
.expect("Unable to create database");
91102
}
92103
// `entities` table to be added, but needs the type to do so

src/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ async fn start_server() -> Result<()> {
8888

8989
if env::args().any(|arg| arg == "--import") {
9090
// world::importing::import_regions(state.clone()).await?;
91-
rayon::ThreadPoolBuilder::new().num_threads(num_cpus::get() * 2).build_global().expect("Failed to build rayon thread pool");
91+
rayon::ThreadPoolBuilder::new().num_threads(num_cpus::get()).build_global().expect("Failed to build rayon thread pool");
9292
world::importing::import_regions(state.clone()).await?;
9393
exit(0);
9494
}
@@ -117,5 +117,3 @@ async fn create_state(tcp_listener: TcpListener) -> Result<GlobalState> {
117117
server_stream: tcp_listener,
118118
}))
119119
}
120-
121-

0 commit comments

Comments
 (0)