1
1
use std:: borrow:: Cow ;
2
2
use std:: future:: Future ;
3
3
use std:: marker:: PhantomData ;
4
-
4
+ use std :: sync :: Arc ;
5
5
use bincode:: { Decode , Encode , config:: standard} ;
6
6
use byteorder:: LE ;
7
7
use futures:: channel:: oneshot:: { self , Canceled } ;
8
8
use heed:: { types:: U64 , BytesDecode , BytesEncode , Env , MdbError } ;
9
- use tracing:: { info, trace, warn} ;
9
+ use moka:: future:: Cache ;
10
+ use tracing:: { trace, warn} ;
10
11
11
12
use crate :: {
12
13
database:: Database ,
@@ -162,27 +163,22 @@ impl Database {
162
163
let key = hash ( ( chunk. dimension . as_ref ( ) . unwrap ( ) , chunk. x_pos , chunk. z_pos ) ) ;
163
164
164
165
// Insert chunk
165
- database. put ( & mut rw_tx, & key, chunk) ?
166
- // .map_err(|err| {
167
- // Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
168
- // })?;
166
+ database. put ( & mut rw_tx, & key, chunk) ?;
169
167
}
170
-
171
168
// Commit changes
172
169
rw_tx. commit ( ) ?;
173
- // .map_err(|err| {
174
- // Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
175
- // })?;
176
170
Ok ( ( ) )
177
171
}
178
172
179
173
async fn load_into_cache ( & self , key : u64 ) -> Result < ( ) , Error > {
180
- let db = self . db . clone ( ) ;
181
- let tsk_db = self . db . clone ( ) ;
182
- let cache = self . cache . clone ( ) ;
174
+ Database :: load_into_cache_standalone ( self . db . clone ( ) , self . cache . clone ( ) , key) . await
175
+ }
176
+
177
+ async fn load_into_cache_standalone ( db : Env , cache : Arc < Cache < u64 , Chunk > > , key : u64 ) -> Result < ( ) , Error > {
178
+ let tsk_db = db. clone ( ) ;
183
179
184
180
tokio:: task:: spawn ( async move {
185
-
181
+
186
182
// Check cache
187
183
if cache. contains_key ( & key) {
188
184
trace ! ( "Chunk already exists in cache: {:X}" , key) ;
@@ -210,7 +206,6 @@ impl Database {
210
206
. await ?;
211
207
Ok ( ( ) )
212
208
}
213
-
214
209
/// Insert a chunk into the database <br>
215
210
/// This will also insert the chunk into the cache <br>
216
211
/// If the chunk already exists, it will return an error
@@ -395,65 +390,6 @@ impl Database {
395
390
///
396
391
/// ```
397
392
pub async fn batch_insert ( & self , values : Vec < Chunk > ) -> Result < ( ) , Error > {
398
- /*
399
-
400
- trace!("processing chunks (compressing and encoding)");
401
- // Process chunks in parallel
402
- let processed_chunks: Vec<(u64, Vec<u8>)> = values
403
- .par_iter()
404
- .map(|chunk| {
405
- let key = hash((
406
- chunk.dimension.as_ref().expect(&format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos)),
407
- chunk.x_pos,
408
- chunk.z_pos,
409
- ));
410
-
411
- let encoded_chunk = encode_to_vec(chunk, standard())
412
- .expect("Failed to encode chunk");
413
- let compressed = zstd_compress(&encoded_chunk, 3)
414
- .expect("Failed to compress chunk.")
415
- ;
416
-
417
- (key, compressed)
418
- })
419
- .collect();
420
- trace!("processed chunks");*/
421
-
422
- // Insert into cache in parallel
423
- // TODO: re-enable this?
424
- /*values.par_iter().for_each(|chunk| {
425
- let key = hash((
426
- chunk.dimension.as_ref().expect(&format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos)),
427
- chunk.x_pos,
428
- chunk.z_pos,
429
- ));
430
-
431
- // tokio::spawn(self.load_into_cache(key));
432
- // if let Err(e) = self.cache.insert(key, chunk.clone()) {
433
- // warn!("Failed to insert chunk into cache: {:?}", e);
434
- // }
435
- });
436
- */
437
-
438
- /*trace!("Inserting chunks into database");
439
- // Perform batch insert into LMDB
440
- spawn_blocking(move || {
441
- let mut rw_tx = db.write_txn()?;
442
- let database = db
443
- .open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
444
- .expect("No table \"chunks\" found. The database should have been initialized");
445
-
446
- for (key, compressed) in processed_chunks {
447
- database.put(&mut rw_tx, &key, &compressed)?;
448
- }
449
-
450
- rw_tx.commit()?;
451
- Ok::<_, Error>(())
452
- })
453
- .await??;
454
-
455
- Ok(())*/
456
-
457
393
// Clone database pointer
458
394
let db = self . db . clone ( ) ;
459
395
let tsk_db = self . db . clone ( ) ;
@@ -467,9 +403,17 @@ impl Database {
467
403
// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
468
404
// This has been modified to avoid having to query database while we already have the data available.
469
405
// First insert into cache
406
+
470
407
for ( key, chunk) in keys. into_iter ( ) . zip ( & values) {
471
- self . cache . insert ( key, chunk. clone ( ) ) . await ;
472
- self . load_into_cache ( key) . await ?;
408
+ let cache = self . cache . clone ( ) ;
409
+ let db = self . db . clone ( ) ;
410
+ let chunk = chunk. clone ( ) ;
411
+ tokio:: spawn ( async move {
412
+ cache. insert ( key, chunk) . await ;
413
+ if let Err ( e) = Database :: load_into_cache_standalone ( db, cache, key) . await {
414
+ warn ! ( "Error inserting chunk into database: {:?}" , e) ;
415
+ }
416
+ } ) ;
473
417
}
474
418
475
419
// Then insert into persistent database
0 commit comments