1
1
use byteorder:: LE ;
2
+ use zstd:: bulk:: compress as zstd_compress;
3
+ use zstd:: bulk:: decompress as zstd_decompress;
2
4
use heed:: types:: { Bytes , U64 } ;
3
5
use heed:: Env ;
4
6
use tokio:: task:: spawn_blocking;
@@ -7,27 +9,27 @@ use tracing::{trace, warn};
7
9
use crate :: database:: Database ;
8
10
use crate :: utils:: error:: Error ;
9
11
use crate :: utils:: hash:: hash;
10
- use crate :: world:: chunkformat :: Chunk ;
12
+ use crate :: world:: chunk_format :: Chunk ;
11
13
12
- use crate :: utils:: binary_utils:: { bzip_compress, bzip_decompress} ;
14
+ // use crate::utils::binary_utils::{bzip_compress, bzip_decompress};
13
15
use bincode:: config:: standard;
14
16
use bincode:: { decode_from_slice, encode_to_vec} ;
15
17
16
18
impl Database {
17
19
/// Fetch chunk from database
18
20
fn get_chunk_from_database ( db : & Env , key : & u64 ) -> Result < Option < Chunk > , Error > {
19
21
// Initialize read transaction and open chunks table
20
- let ro_tx = db. read_txn ( ) . unwrap ( ) ;
22
+ let ro_tx = db. read_txn ( ) ? ;
21
23
let database = db
22
- . open_database :: < U64 < LE > , Bytes > ( & ro_tx, Some ( "chunks" ) )
23
- . unwrap ( )
24
+ . open_database :: < U64 < LE > , Bytes > ( & ro_tx, Some ( "chunks" ) ) ?
24
25
. expect ( "No table \" chunks\" found. The database should have been initialized" ) ;
25
26
26
27
// Attempt to fetch chunk from table
27
28
if let Ok ( data) = database. get ( & ro_tx, key) {
28
29
Ok ( data. map ( |encoded_chunk| {
29
- let decompressed =
30
- bzip_decompress ( & encoded_chunk) . expect ( "Failed to decompress chunk" ) ;
30
+ // let decompressed =
31
+ // bzip_decompress(&encoded_chunk).expect("Failed to decompress chunk");
32
+ let decompressed = zstd_decompress ( & encoded_chunk, 1024 * 1024 * 64 ) . expect ( "Failed to decompress chunk" ) ;
31
33
let chunk: ( Chunk , usize ) = decode_from_slice ( & * decompressed, standard ( ) )
32
34
. expect ( "Failed to decode chunk from database" ) ;
33
35
chunk. 0
@@ -40,15 +42,15 @@ impl Database {
40
42
/// Insert a single chunk into database
41
43
fn insert_chunk_into_database ( db : & Env , chunk : & Chunk ) -> Result < ( ) , Error > {
42
44
// Initialize write transaction and open chunks table
43
- let mut rw_tx = db. write_txn ( ) . unwrap ( ) ;
45
+ let mut rw_tx = db. write_txn ( ) ? ;
44
46
let database = db
45
- . open_database :: < U64 < LE > , Bytes > ( & rw_tx, Some ( "chunks" ) )
46
- . unwrap ( )
47
+ . open_database :: < U64 < LE > , Bytes > ( & rw_tx, Some ( "chunks" ) ) ?
47
48
. expect ( "No table \" chunks\" found. The database should have been initialized" ) ;
48
49
49
50
// Encode chunk
50
51
let encoded_chunk = encode_to_vec ( chunk, standard ( ) ) . expect ( "Failed to encode chunk" ) ;
51
- let compressed = bzip_compress ( & encoded_chunk) . expect ( "Failed to compress chunk" ) ;
52
+ // let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
53
+ let compressed = zstd_compress ( & encoded_chunk, 3 ) . expect ( "Failed to compress chunk" ) ;
52
54
let key = hash ( ( chunk. dimension . as_ref ( ) . unwrap ( ) , chunk. x_pos , chunk. z_pos ) ) ;
53
55
54
56
// Insert chunk
@@ -70,18 +72,18 @@ impl Database {
70
72
/// TODO: Find better name/disambiguation
71
73
fn insert_chunks_into_database ( db : & Env , chunks : & [ Chunk ] ) -> Result < ( ) , Error > {
72
74
// Initialize write transaction and open chunks table
73
- let mut rw_tx = db. write_txn ( ) . unwrap ( ) ;
75
+ let mut rw_tx = db. write_txn ( ) ? ;
74
76
let database = db
75
- . open_database :: < U64 < LE > , Bytes > ( & rw_tx, Some ( "chunks" ) )
76
- . unwrap ( )
77
+ . open_database :: < U64 < LE > , Bytes > ( & rw_tx, Some ( "chunks" ) ) ?
77
78
. expect ( "No table \" chunks\" found. The database should have been initialized" ) ;
78
79
79
80
// Update page
80
81
for chunk in chunks {
81
82
// Encode chunk
82
83
let encoded_chunk = encode_to_vec ( chunk, standard ( ) ) . expect ( "Failed to encode chunk" ) ;
83
84
84
- let compressed = bzip_compress ( & encoded_chunk) . expect ( "Failed to compress chunk" ) ;
85
+ // let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
86
+ let compressed = zstd_compress ( & encoded_chunk, 3 ) . expect ( "Failed to compress chunk" ) ;
85
87
let key = hash ( ( chunk. dimension . as_ref ( ) . unwrap ( ) , chunk. x_pos , chunk. z_pos ) ) ;
86
88
87
89
// Insert chunk
@@ -126,7 +128,7 @@ impl Database {
126
128
warn ! ( "Error getting chunk: {:X}" , key, ) ;
127
129
}
128
130
} )
129
- . await ?;
131
+ . await ?;
130
132
Ok ( ( ) )
131
133
}
132
134
@@ -310,13 +312,72 @@ impl Database {
310
312
///
311
313
/// ```
312
314
pub async fn batch_insert ( & self , values : Vec < Chunk > ) -> Result < ( ) , Error > {
315
+ /*
316
+
317
+ trace!("processing chunks (compressing and encoding)");
318
+ // Process chunks in parallel
319
+ let processed_chunks: Vec<(u64, Vec<u8>)> = values
320
+ .par_iter()
321
+ .map(|chunk| {
322
+ let key = hash((
323
+ chunk.dimension.as_ref().expect(&format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos)),
324
+ chunk.x_pos,
325
+ chunk.z_pos,
326
+ ));
327
+
328
+ let encoded_chunk = encode_to_vec(chunk, standard())
329
+ .expect("Failed to encode chunk");
330
+ let compressed = zstd_compress(&encoded_chunk, 3)
331
+ .expect("Failed to compress chunk.")
332
+ ;
333
+
334
+ (key, compressed)
335
+ })
336
+ .collect();
337
+ trace!("processed chunks");*/
338
+
339
+ // Insert into cache in parallel
340
+ // TODO: re-enable this?
341
+ /*values.par_iter().for_each(|chunk| {
342
+ let key = hash((
343
+ chunk.dimension.as_ref().expect(&format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos)),
344
+ chunk.x_pos,
345
+ chunk.z_pos,
346
+ ));
347
+
348
+ // tokio::spawn(self.load_into_cache(key));
349
+ // if let Err(e) = self.cache.insert(key, chunk.clone()) {
350
+ // warn!("Failed to insert chunk into cache: {:?}", e);
351
+ // }
352
+ });
353
+ */
354
+
355
+ /*trace!("Inserting chunks into database");
356
+ // Perform batch insert into LMDB
357
+ spawn_blocking(move || {
358
+ let mut rw_tx = db.write_txn()?;
359
+ let database = db
360
+ .open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
361
+ .expect("No table \"chunks\" found. The database should have been initialized");
362
+
363
+ for (key, compressed) in processed_chunks {
364
+ database.put(&mut rw_tx, &key, &compressed)?;
365
+ }
366
+
367
+ rw_tx.commit()?;
368
+ Ok::<_, Error>(())
369
+ })
370
+ .await??;
371
+
372
+ Ok(())*/
373
+
313
374
// Clone database pointer
314
375
let db = self . db . clone ( ) ;
315
376
316
377
// Calculate all keys
317
378
let keys = values
318
379
. iter ( )
319
- . map ( |v| hash ( ( v. dimension . as_ref ( ) . unwrap ( ) , v. x_pos , v. z_pos ) ) )
380
+ . map ( |v| hash ( ( v. dimension . as_ref ( ) . expect ( format ! ( "Invalid chunk @ ({},{})" , v . x_pos , v . z_pos ) . as_str ( ) ) , v. x_pos , v. z_pos ) ) )
320
381
. collect :: < Vec < u64 > > ( ) ;
321
382
322
383
// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
0 commit comments