Skip to content

Commit 4e2ac0b

Browse files
committed
The code is absolutely garbage, but it works & That's enough for today ladies and gentlemen. Some other day, we'll meet again. I'm going crazy.
1 parent bc6f119 commit 4e2ac0b

File tree

4 files changed

+144
-31
lines changed

4 files changed

+144
-31
lines changed

src/database/chunks.rs

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use bincode::{Decode, Encode, config::standard};
66
use byteorder::LE;
77
use futures::channel::oneshot::{self, Canceled};
88
use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError};
9+
use heed::types::Bytes;
910
use moka::future::Cache;
11+
use tokio::runtime::Handle;
12+
use tokio::task::block_in_place;
1013
use tracing::{trace, warn};
1114

1215
use crate::{
@@ -15,7 +18,7 @@ use crate::{
1518
utils::hash::hash,
1619
world::chunk_format::Chunk
1720
};
18-
21+
use crate::world::importing::SerializedChunk;
1922
use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL};
2023

2124
pub struct Zstd<T>(PhantomData<T>);
@@ -48,6 +51,30 @@ impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
4851
}
4952
}
5053

54+
pub struct ZstdCodec;
55+
56+
impl ZstdCodec {
57+
pub async fn compress_data<T: Encode + Send + 'static>(data: T) -> crate::Result<Vec<u8>> {
58+
tokio::task::spawn_blocking(
59+
move ||{
60+
let mut bytes = Vec::new();
61+
let mut compressor = zstd::Encoder::new(&mut bytes, 3)?;
62+
bincode::encode_into_std_write(&data, &mut compressor, standard())?;
63+
compressor.finish()?;
64+
Ok(bytes)
65+
}
66+
).await?
67+
}
68+
pub async fn decompress_data<T: Decode + Send + 'static>(data: Vec<u8>) -> crate::Result<T> {
69+
tokio::task::spawn_blocking(
70+
move || {
71+
let decoded = bincode::decode_from_slice(data.as_slice(), standard())?;
72+
Ok(decoded.0)
73+
}
74+
).await?
75+
}
76+
}
77+
5178
/// LMDB will follow a linear growth as opposed to MDBX which
5279
/// uses a geometric growth.
5380
pub(super) fn new_page_size(old_size: usize) -> usize {
@@ -114,11 +141,23 @@ impl Database {
114141
// Initialize read transaction and open chunks table
115142
let ro_tx = db.read_txn()?;
116143
let database = db
117-
.open_database::<U64<LE>, Zstd<Chunk>>(&ro_tx, Some("chunks"))?
144+
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
118145
.expect("No table \"chunks\" found. The database should have been initialized");
119146

120147
// Attempt to fetch chunk from table
121-
database.get(&ro_tx, key)
148+
let data = database.get(&ro_tx, key)?;
149+
let chunk = match data {
150+
Some(data) => {
151+
// let chunk = ZstdCodec::decompress_data::<Chunk>(data.to_vec()).expect("Failed to decompress chunk");
152+
let chunk = Handle::current().block_on(async {
153+
ZstdCodec::decompress_data::<Chunk>(data.to_vec()).await.expect("Failed to decompress chunk")
154+
});
155+
Some(chunk)
156+
}
157+
None => None,
158+
};
159+
160+
Ok(chunk)
122161
//.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
123162
}
124163

@@ -127,14 +166,23 @@ impl Database {
127166
// Initialize write transaction and open chunks table
128167
let mut rw_tx = db.write_txn()?;
129168
let database = db
130-
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
169+
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
131170
.expect("No table \"chunks\" found. The database should have been initialized");
132171

133172
// Calculate key
134173
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
135174

175+
// let chunk = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk");
176+
let chunk = chunk.clone();
177+
let chunk = Handle::current().block_on(async {
178+
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
179+
});
180+
/*
181+
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
182+
});*/
183+
136184
// Insert chunk
137-
let res = database.put(&mut rw_tx, &key, chunk);
185+
let res = database.put(&mut rw_tx, &key, chunk.as_slice());
138186
rw_tx.commit()?;
139187
// .map_err(|err| {
140188
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
@@ -152,20 +200,20 @@ impl Database {
152200

153201
/// Insert multiple chunks into database
154202
/// TODO: Find better name/disambiguation
155-
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> {
203+
fn insert_chunks_into_database(db: &Env, chunks: &[SerializedChunk]) -> Result<(), heed::Error> {
156204
// Initialize write transaction and open chunks table
157205
let mut rw_tx = db.write_txn()?;
158206
let database = db
159-
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
207+
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
160208
.expect("No table \"chunks\" found. The database should have been initialized");
161209

162210
// Update page
163211
for chunk in chunks {
164212
// Calculate key
165-
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
213+
// let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
166214

167215
// Insert chunk
168-
database.put(&mut rw_tx, &key, chunk)?;
216+
database.put(&mut rw_tx, &chunk.hash(), chunk.data())?;
169217
}
170218
// Commit changes
171219
rw_tx.commit()?;
@@ -392,33 +440,38 @@ impl Database {
392440
/// }
393441
///
394442
/// ```
395-
pub async fn batch_insert(&self, values: Vec<Chunk>) -> Result<(), Error> {
443+
pub async fn batch_insert(&self, values: Vec<SerializedChunk>) -> Result<(), Error> {
396444
// Clone database pointer
397445
let db = self.db.clone();
398446
let tsk_db = self.db.clone();
399447

400448
// Calculate all keys
401-
let keys = values
449+
/* let keys = values
402450
.iter()
403451
.map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos)))
404452
.collect::<Vec<u64>>();
453+
*/
454+
// let keys = values.iter().map(|v| v.hash()).collect::<Vec<u64>>();
405455

406456
// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
407457
// This has been modified to avoid having to query database while we already have the data available.
408458
// First insert into cache
409459

410-
for (key, chunk) in keys.into_iter().zip(&values) {
460+
// TODO: Renable cache. Currently disabled because we only get serialized bytes with the hash.
461+
// to save in the database
462+
/*for (chunk) in values.iter() {
411463
let cache = self.cache.clone();
412464
let db = self.db.clone();
413-
let chunk = chunk.clone();
465+
let key = chunk.hash();
466+
let chunk = chunk.data().clone();
414467
tokio::spawn(async move {
415468
cache.insert(key, chunk).await;
416469
if let Err(e) = Database::load_into_cache_standalone(db, cache, key).await {
417470
warn!("Error inserting chunk into database: {:?}", e);
418471
}
419472
});
420473
}
421-
474+
*/
422475
// Then insert into persistent database
423476
spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values))
424477
.await

src/database/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use byteorder::LE;
22
use chunks::Zstd;
33
use deepsize::DeepSizeOf;
44
use futures::FutureExt;
5-
use heed::types::U64;
5+
use heed::types::{Bytes, U64};
66
use heed::{Env as LMDBDatabase, EnvFlags, EnvOpenOptions};
77
use moka::notification::{ListenerFuture, RemovalCause};
88
use rayon::{ThreadPool, ThreadPoolBuilder};
@@ -95,18 +95,18 @@ pub async fn start_database() -> Result<Database, Error> {
9595
});
9696

9797
// Check if database is built. Otherwise, initialize it
98-
let mut rw_tx = lmdb.write_txn().unwrap();
98+
let mut rw_tx = lmdb.write_txn()?;
9999
if lmdb
100-
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))
101-
.unwrap()
100+
// .open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))
101+
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
102102
.is_none()
103103
{
104-
lmdb.create_database::<U64<LE>, Zstd<Chunk>>(&mut rw_tx, Some("chunks"))
104+
lmdb.create_database::<U64<LE>, Bytes>(&mut rw_tx, Some("chunks"))
105105
.expect("Unable to create database");
106106
}
107107
// `entities` table to be added, but needs the type to do so
108108

109-
rw_tx.commit().unwrap();
109+
rw_tx.commit()?;
110110

111111
info!("Database started");
112112

src/utils/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ pub enum Error {
9393

9494
#[error("Database error: {0}")]
9595
LmdbError(#[from] heed::Error),
96+
#[error("(bincode) Encode error")]
97+
BincodeEncodeError(#[from] bincode::error::EncodeError),
98+
#[error("(bincode) Decode error")]
99+
BincodeDecodeError(#[from] bincode::error::DecodeError),
96100
}
97101

98102
impl From<Infallible> for Error {

src/world/importing.rs

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,41 @@
11
use crate::utils::prelude::*;
22
use fastanvil::{ChunkData, Region};
33
use indicatif::{ProgressBar, ProgressStyle};
4-
use nbt_lib::NBTDeserializeBytes;
4+
use nbt_lib::{NBTDeserializeBytes, NBTSerialize};
55
use rayon::prelude::*;
66
use std::env;
77
use std::fs::File;
88
use std::io::Cursor;
99
use std::path::PathBuf;
1010
use std::sync::Arc;
11+
use bincode::config::standard;
12+
use bincode::Encode;
13+
use tokio::runtime::Handle;
1114
use tracing::{debug, info, warn};
12-
15+
use crate::database::chunks::ZstdCodec;
1316
use crate::state::GlobalState;
17+
use crate::utils::hash::hash;
1418
use crate::world::chunk_format::Chunk;
1519

1620
const DEFAULT_BATCH_SIZE: u8 = 150;
1721

22+
/// A serialized chunk is a tuple of the chunk's hash and the compressed chunk data
23+
/// (hash, compressed_chunk_data)
24+
pub struct SerializedChunk(u64, Vec<u8>);
25+
26+
impl SerializedChunk {
27+
pub fn new(hash: u64, data: Vec<u8>) -> Self {
28+
Self(hash, data)
29+
}
30+
pub fn hash(&self) -> u64 {
31+
self.0
32+
}
33+
34+
pub fn data(&self) -> &Vec<u8> {
35+
self.1.as_ref()
36+
}
37+
}
38+
1839
fn get_batch_size() -> i32 {
1940
let batch_size = env::args()
2041
.find(|x| x.starts_with("--batch_size="))
@@ -73,22 +94,31 @@ async fn get_total_chunks(dir: &PathBuf) -> Result<usize> {
7394
Ok(regions.into_par_iter().map(|mut region| region.iter().count()).sum())
7495
}
7596

76-
fn process_chunk(chunk_data: Vec<u8>, file_name: &str, bar: Arc<ProgressBar>) -> Result<Chunk> {
77-
let mut final_chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data))
97+
async fn process_chunk(chunk_data: Vec<u8>, file_name: &str, bar: Arc<ProgressBar>) -> Result<SerializedChunk> {
98+
let mut chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data))
7899
.map_err(|e| {
79100
bar.abandon_with_message(format!("Chunk {} failed to import", file_name));
80101
Error::Generic(format!("Could not read chunk {} {}", e, file_name))
81102
})?;
82103

83-
final_chunk.convert_to_net_mode()
104+
chunk.convert_to_net_mode()
84105
.map_err(|e| {
85-
bar.abandon_with_message(format!("Chunk {} {} failed to import", final_chunk.x_pos, final_chunk.z_pos));
86-
Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", final_chunk.x_pos, final_chunk.z_pos, e))
106+
bar.abandon_with_message(format!("Chunk {} {} failed to import", chunk.x_pos, chunk.z_pos));
107+
Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", chunk.x_pos, chunk.z_pos, e))
87108
})?;
88109

89-
final_chunk.dimension = Some("overworld".to_string());
110+
chunk.dimension = Some("overworld".to_string());
90111

91-
Ok(final_chunk)
112+
// let chunk_data = bincode::encode_to_vec(final_chunk, standard())?;
113+
// let chunk_data = ZstdCodec::compress_data(final_chunk);
114+
let hash = hash((chunk.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos).as_str()), chunk.x_pos, chunk.z_pos));
115+
/*let chunk_data = tokio::task::(async move {
116+
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
117+
});*/
118+
// let chunk_data = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk");
119+
let chunk_data = ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk");
120+
121+
Ok(SerializedChunk::new(hash, chunk_data))
92122
}
93123

94124
//noinspection RsBorrowChecker
@@ -122,7 +152,7 @@ pub async fn import_regions(state: GlobalState) -> Result<()> {
122152
while !chunks.is_empty() {
123153
let chunk_batch: Vec<ChunkData> = chunks.drain(..std::cmp::min(batch_size, chunks.len())).collect();
124154

125-
let processed_chunks: Vec<Chunk> = chunk_batch.into_par_iter()
155+
/*let processed_chunks: Vec<SerializedChunk> = chunk_batch.into_par_iter()
126156
.filter_map(|chunk| {
127157
let data = chunk.data.clone();
128158
match process_chunk(data, file_name, Arc::clone(&bar)) {
@@ -137,6 +167,32 @@ pub async fn import_regions(state: GlobalState) -> Result<()> {
137167
}
138168
}
139169
})
170+
.collect();*/
171+
172+
let processed_chunks_futures: Vec<_> = chunk_batch.into_iter()
173+
.map(|chunk| {
174+
let data = chunk.data.clone();
175+
let bar_clone = Arc::clone(&bar);
176+
let file_name = file_name.to_string();
177+
tokio::spawn(async move {
178+
match process_chunk(data, &file_name, Arc::clone(&bar_clone)).await {
179+
Ok(processed) => {
180+
bar_clone.inc(1);
181+
Some(processed)
182+
}
183+
Err(e) => {
184+
warn!("Failed to process chunk: {}. Skipping.", e);
185+
None
186+
}
187+
}
188+
})
189+
})
190+
.collect();
191+
192+
let processed_chunks: Vec<SerializedChunk> = futures::future::join_all(processed_chunks_futures)
193+
.await
194+
.into_iter()
195+
.filter_map(|result| result.ok().flatten())
140196
.collect();
141197

142198
// Insert the batch of processed chunks
@@ -193,7 +249,7 @@ fn create_progress_bar(total_chunks: usize) -> ProgressBar {
193249
bar
194250
}
195251

196-
async fn insert_chunks(state: &GlobalState, queued_chunks: Vec<Chunk>, bar: &ProgressBar) -> Result<()> {
252+
async fn insert_chunks(state: &GlobalState, queued_chunks: Vec<SerializedChunk>, bar: &ProgressBar) -> Result<()> {
197253
state.database.batch_insert(queued_chunks).await
198254
.map_err(|e| {
199255
bar.abandon_with_message("Chunk insertion failed".to_string());

0 commit comments

Comments
 (0)