Skip to content

Commit f0f41f3

Browse files
authored
Merge pull request #351 from NanCunChild/fix/lmdb-reopen-dbi
Fix/lmdb reopen dbi
2 parents 0f93b07 + 81b28da commit f0f41f3

9 files changed

Lines changed: 4495 additions & 28 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ cargo-features = ["profile-rustflags"]
1212
[workspace]
1313
resolver = "2"
1414

15+
# `tools/stress-bot` is a dev-only load-testing tool (azalea-based swarm client). It is its own
16+
# standalone workspace and excluded here so its large dependency tree never compiles as part of the
17+
# main build, CI, or release. See `tools/stress-bot/README.md`.
18+
exclude = ["tools/stress-bot"]
19+
1520
#================= Members =================#
1621
members = [
1722
"src/bin",

src/bin/src/systems/fluids/mod.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,31 @@ pub fn seed_fluid_tick(
249249
state: state.clone(),
250250
dim,
251251
};
252+
seed_fluid_tick_with_view(scheduler, &view, current_tick, pos);
253+
}
254+
255+
/// Like [`seed_fluid_tick`] but reuses an existing [`WorldBlockView`] instead of constructing one.
256+
///
257+
/// Building a [`WorldBlockView`] clones the [`GlobalState`] handle (an `Arc` bump). Hot paths seed
258+
/// many positions back to back — applying a batch of changes wakes the six neighbours of every
259+
/// changed block — so building the view once and passing it in turns what was an `Arc` clone per
260+
/// seeded position into a single clone for the whole batch. The reads themselves are unaffected: the
261+
/// view always reflects the live world (it reads the chunk cache on each call), so reusing it across
262+
/// the intervening `apply_change` writes is equivalent to rebuilding it every time.
263+
fn seed_fluid_tick_with_view(
264+
scheduler: &mut BlockTickScheduler,
265+
view: &WorldBlockView,
266+
current_tick: u64,
267+
pos: BlockPos,
268+
) {
252269
let block = view.block_at(pos);
253-
let Some(rules) = rules_for_block(block, dim.0) else {
270+
let Some(rules) = rules_for_block(block, view.dim.0) else {
254271
debug_assert!(!is_fluid(block), "rules_for_block disagreed with is_fluid");
255272
return;
256273
};
257274
// A lava block already touching water solidifies almost immediately rather than waiting out its
258275
// spread cadence.
259-
let delay = if would_react(pos, &view) {
276+
let delay = if would_react(pos, view) {
260277
REACTION_DELAY
261278
} else {
262279
rules.tick_delay
@@ -290,12 +307,18 @@ pub fn seed_on_block_break(
290307
) {
291308
let current = tick.get();
292309
let dim = *dim;
310+
// One view for the whole batch of break events: each seed below would otherwise clone the state
311+
// handle, and a break wakes seven positions (the block plus its six neighbours).
312+
let view = WorldBlockView {
313+
state: state.0.clone(),
314+
dim,
315+
};
293316
for event in events.read() {
294317
let pos = event.position;
295318
// The broken position itself (in case it is now exposed to a fluid) and its neighbours.
296-
seed_fluid_tick(&mut scheduler.0, &state.0, dim, current, pos);
319+
seed_fluid_tick_with_view(&mut scheduler.0, &view, current, pos);
297320
for neighbour in neighbours(pos) {
298-
seed_fluid_tick(&mut scheduler.0, &state.0, dim, current, neighbour);
321+
seed_fluid_tick_with_view(&mut scheduler.0, &view, current, neighbour);
299322
}
300323
}
301324
}
@@ -551,6 +574,14 @@ fn apply_changes(
551574
// Fallback cadence for re-ticking a changed block whose new state is not itself fluid.
552575
let fallback_delay = FluidRules::for_kind(FluidKind::Water, dim.0).tick_delay;
553576

577+
// One view for the whole batch. Neighbour waking below reads through it after each
578+
// `apply_change` write; because the view reads the live chunk cache (never a snapshot), reusing
579+
// it is equivalent to rebuilding it per seed but without the per-seed state-handle clone.
580+
let view = WorldBlockView {
581+
state: state.clone(),
582+
dim,
583+
};
584+
554585
for change in changes {
555586
if !apply_change(state, dim, change) {
556587
continue;
@@ -569,10 +600,10 @@ fn apply_changes(
569600
scheduler.schedule(change.pos, TickKind::FluidSpread, current, delay);
570601
}
571602

572-
// Wake fluid neighbours so the update propagates outward. `seed_fluid_tick` skips
603+
// Wake fluid neighbours so the update propagates outward. `seed_fluid_tick_with_view` skips
573604
// non-fluid neighbours and picks the reaction cadence for lava that now touches water.
574605
for neighbour in fluid_neighbours(change.pos) {
575-
seed_fluid_tick(scheduler, state, dim, current, neighbour);
606+
seed_fluid_tick_with_view(scheduler, &view, current, neighbour);
576607
}
577608
}
578609
}

src/lib/derive_macros/src/registries_packets/mod.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use indexmap::IndexMap;
22
use quote::quote;
33
use serde_json::Value;
4+
use std::collections::HashMap;
5+
6+
use craftflow_nbt::DynNBT;
47

58
pub(crate) fn build_mapping(_: proc_macro::TokenStream) -> proc_macro::TokenStream {
69
let json_file = include_bytes!("../../../../../assets/data/registry_packets.json");
@@ -12,7 +15,20 @@ pub(crate) fn build_mapping(_: proc_macro::TokenStream) -> proc_macro::TokenStre
1215
let mut packets = vec![];
1316
for (value_name, value) in &value_set {
1417
let mut nbt_data_buf = Vec::new();
15-
craftflow_nbt::to_writer(&mut nbt_data_buf, &value).unwrap();
18+
// The registry data is sourced from JSON, which cannot express NBT's distinct numeric
19+
// tags: every JSON integer would otherwise serialise as a `Long` and every real as a
20+
// `Double`. The vanilla client coerces numeric tags leniently and tolerates that, but
21+
// strict clients deserialise the registry into typed structs and reject a field whose
22+
// tag is not exactly what the schema expects (e.g. `dimension_type.height` must be an
23+
// `Int`, not a `Long`). `dimension_type` is encoded through a schema-aware converter so
24+
// every field carries its correct tag; all other registries keep the byte-for-byte
25+
// output of the previous generic path until they, too, need a schema.
26+
if reg_entry == "minecraft:dimension_type" {
27+
let nbt = dimension_type_to_nbt(value);
28+
craftflow_nbt::to_writer(&mut nbt_data_buf, &nbt).unwrap();
29+
} else {
30+
craftflow_nbt::to_writer(&mut nbt_data_buf, &value).unwrap();
31+
}
1632
let kv = (value_name.clone(), nbt_data_buf);
1733
packets.push(kv);
1834
}
@@ -35,3 +51,93 @@ pub(crate) fn build_mapping(_: proc_macro::TokenStream) -> proc_macro::TokenStre
3551
}
3652
.into()
3753
}
54+
55+
/// The NBT numeric tag a value must use, when it differs from the generic default (integers → Int,
56+
/// reals → Double). Only the tags actually needed by the current schema overrides are listed.
57+
#[derive(Clone, Copy)]
58+
enum NumTag {
59+
Long,
60+
Float,
61+
Double,
62+
}
63+
64+
/// The `dimension_type` fields whose vanilla NBT tag differs from the generic default. Every other
65+
/// field is an `Int` (e.g. `height`, `min_y`, `logical_height`), a `Byte` boolean, a `String`, or a
66+
/// nested compound, all of which the generic conversion already produces correctly.
67+
fn dimension_type_field_tag(field: &str) -> Option<NumTag> {
68+
match field {
69+
// Stored as `0`/`0.x` in JSON but a float in the dimension codec.
70+
"ambient_light" => Some(NumTag::Float),
71+
// A double in the dimension codec; JSON carries it as the integer `1`.
72+
"coordinate_scale" => Some(NumTag::Double),
73+
// A long in the dimension codec (optional; present for the Nether and the End).
74+
"fixed_time" => Some(NumTag::Long),
75+
_ => None,
76+
}
77+
}
78+
79+
/// Converts one `dimension_type` entry's JSON into correctly-typed NBT, applying the per-field tag
80+
/// overrides above to the entry's top-level fields. Nested values (e.g. the
81+
/// `monster_spawn_light_level` int-provider compound) use the generic conversion, which already
82+
/// yields `Int` for their integers.
83+
fn dimension_type_to_nbt(entry: &Value) -> DynNBT {
84+
let obj = entry
85+
.as_object()
86+
.expect("dimension_type entry must be a JSON object");
87+
let mut map = HashMap::with_capacity(obj.len());
88+
for (key, value) in obj {
89+
map.insert(
90+
key.clone(),
91+
json_to_nbt(value, dimension_type_field_tag(key)),
92+
);
93+
}
94+
DynNBT::Compound(map)
95+
}
96+
97+
/// Generic JSON → NBT conversion with Minecraft-appropriate defaults: integers become `Int` (not
98+
/// `Long`), reals become `Double`, and booleans become `Byte`. `force` overrides the numeric tag
99+
/// for a single scalar where the schema demands a non-default tag; it does not propagate into
100+
/// nested lists or compounds.
101+
fn json_to_nbt(value: &Value, force: Option<NumTag>) -> DynNBT {
102+
match value {
103+
Value::Bool(b) => DynNBT::Byte(i8::from(*b)),
104+
Value::Number(n) => match force {
105+
Some(NumTag::Float) => DynNBT::Float(num_f64(n) as f32),
106+
Some(NumTag::Double) => DynNBT::Double(num_f64(n)),
107+
Some(NumTag::Long) => DynNBT::Long(num_i64(n)),
108+
None => {
109+
if let Some(i) = n.as_i64() {
110+
// Default integers to Int (the common registry tag), widening to Long only when
111+
// the value genuinely does not fit in an i32.
112+
match i32::try_from(i) {
113+
Ok(v) => DynNBT::Int(v),
114+
Err(_) => DynNBT::Long(i),
115+
}
116+
} else {
117+
DynNBT::Double(num_f64(n))
118+
}
119+
}
120+
},
121+
Value::String(s) => DynNBT::String(s.clone()),
122+
Value::Array(items) => DynNBT::List(items.iter().map(|v| json_to_nbt(v, None)).collect()),
123+
Value::Object(obj) => {
124+
let mut map = HashMap::with_capacity(obj.len());
125+
for (key, value) in obj {
126+
map.insert(key.clone(), json_to_nbt(value, None));
127+
}
128+
DynNBT::Compound(map)
129+
}
130+
// Registries contain no JSON nulls; encode defensively as a zero byte rather than panicking.
131+
Value::Null => DynNBT::Byte(0),
132+
}
133+
}
134+
135+
fn num_f64(n: &serde_json::Number) -> f64 {
136+
n.as_f64()
137+
.expect("registry numeric value is representable as f64")
138+
}
139+
140+
fn num_i64(n: &serde_json::Number) -> i64 {
141+
n.as_i64()
142+
.expect("registry numeric value forced to an integer tag must be an integer")
143+
}

src/lib/storage/src/lmdb.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,18 @@ impl LmdbBackend {
9393
if let Some(db) = cache.get(table) {
9494
return Ok(Some(*db));
9595
}
96-
let opened = {
97-
let rtxn = self.env.read_txn()?;
98-
self.env
99-
.open_database::<U128<BigEndian>, Bytes>(&rtxn, Some(table))?
100-
// `rtxn` is dropped at the end of this block, before the handle is cached or returned.
101-
};
96+
// The handle must be opened inside a *write* transaction that is then committed. LMDB ties a
97+
// newly opened database handle to the transaction that opened it: it stays private to that
98+
// transaction until commit, and is closed automatically if the transaction is aborted. A
99+
// read transaction has no commit (dropping it aborts), so a handle opened in one would be
100+
// closed the moment the transaction is dropped — leaving a dangling `dbi` that fails with
101+
// `EINVAL` when reused in a later transaction. Opening in a committed write transaction
102+
// promotes the handle into the shared environment so it remains valid for the env's lifetime.
103+
let wtxn = self.env.write_txn()?;
104+
let opened = self
105+
.env
106+
.open_database::<U128<BigEndian>, Bytes>(&wtxn, Some(table))?;
107+
wtxn.commit()?;
102108
if let Some(db) = opened {
103109
cache.insert(table.to_string(), db);
104110
}
@@ -304,6 +310,36 @@ mod tests {
304310
remove_dir_all(path).unwrap();
305311
}
306312

313+
/// Reopening an existing environment must be able to read tables that were created by a
314+
/// previous process. This exercises [`LmdbBackend::open_table`], the path taken for a table that
315+
/// already exists on disk but is not yet in the in-process handle cache. A handle opened inside
316+
/// a read transaction is closed when that transaction is aborted, so caching and reusing it
317+
/// later fails with `EINVAL`; opening it in a committed write transaction keeps it valid.
318+
#[test]
319+
fn test_reopen_reads_existing_table() {
320+
let path = tempdir().unwrap().keep();
321+
let key = 12345678901234567890u128;
322+
let value = vec![9, 8, 7, 6];
323+
{
324+
let backend =
325+
LmdbBackend::initialize(Some(path.clone()), 10 * 1024 * 1024 * 1024).unwrap();
326+
backend.create_table("test_table".to_string()).unwrap();
327+
backend
328+
.insert("test_table".to_string(), key, value.clone())
329+
.unwrap();
330+
backend.flush().unwrap();
331+
}
332+
// Fresh backend over the same path with an empty handle cache, mirroring a server restart.
333+
{
334+
let backend =
335+
LmdbBackend::initialize(Some(path.clone()), 10 * 1024 * 1024 * 1024).unwrap();
336+
assert!(backend.exists("test_table".to_string(), key).unwrap());
337+
let retrieved_value = backend.get("test_table".to_string(), key).unwrap();
338+
assert_eq!(retrieved_value, Some(value));
339+
}
340+
remove_dir_all(path).unwrap();
341+
}
342+
307343
#[test]
308344
fn test_batch_insert() {
309345
let path = tempdir().unwrap().keep();

src/lib/world/src/scheduler/mod.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,20 @@ impl ChunkTickQueue {
6464
}
6565

6666
fn drain_due(&mut self, current_tick: u64, out: &mut Vec<ScheduledTick>) {
67-
// Partition into due / not-yet-due, retaining the latter.
68-
let mut remaining = Vec::with_capacity(self.pending.len());
69-
for tick in self.pending.drain(..) {
67+
// Compact in place: move due ticks into `out` and keep the rest. `retain` shifts the kept
68+
// elements down without allocating, where the previous implementation allocated a fresh
69+
// full-size buffer on every call — for every chunk, every tick, even when nothing was due.
70+
// `seen` is borrowed separately from `pending` so the closure does not capture all of `self`.
71+
let seen = &mut self.seen;
72+
self.pending.retain(|tick| {
7073
if tick.target_tick <= current_tick {
71-
self.seen.remove(&(tick.pos, tick.kind, tick.target_tick));
72-
out.push(tick);
74+
seen.remove(&(tick.pos, tick.kind, tick.target_tick));
75+
out.push(*tick);
76+
false
7377
} else {
74-
remaining.push(tick);
78+
true
7579
}
76-
}
77-
self.pending = remaining;
80+
});
7881
}
7982

8083
/// Drains at most `budget` due ticks into `out`, leaving any remaining due ticks queued (they
@@ -85,18 +88,20 @@ impl ChunkTickQueue {
8588
out: &mut Vec<ScheduledTick>,
8689
budget: usize,
8790
) -> usize {
91+
// Compact in place (see `drain_due`). Order is preserved, so once the budget is exhausted
92+
// every still-due tick is simply kept and picked up by a later drain.
93+
let seen = &mut self.seen;
8894
let mut taken = 0;
89-
let mut remaining = Vec::with_capacity(self.pending.len());
90-
for tick in self.pending.drain(..) {
95+
self.pending.retain(|tick| {
9196
if taken < budget && tick.target_tick <= current_tick {
92-
self.seen.remove(&(tick.pos, tick.kind, tick.target_tick));
93-
out.push(tick);
97+
seen.remove(&(tick.pos, tick.kind, tick.target_tick));
98+
out.push(*tick);
9499
taken += 1;
100+
false
95101
} else {
96-
remaining.push(tick);
102+
true
97103
}
98-
}
99-
self.pending = remaining;
104+
});
100105
taken
101106
}
102107

0 commit comments

Comments
 (0)