Skip to content

Commit f823b33

Browse files
authored
Merge pull request #138 from ferrumc-rs/rework/better-chunk-sending
Less terrible chunk sending
2 parents cc11874 + a9f2f17 commit f823b33

File tree

24 files changed

+472
-354
lines changed

24 files changed

+472
-354
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# - Workspace lints
77
# - Workspace dependencies.
88

9+
910
[workspace]
1011
resolver = "2"
1112

@@ -104,6 +105,7 @@ ferrumc-utils = { path = "src/lib/utils" }
104105
ferrumc-world = { path = "src/lib/world" }
105106

106107

108+
107109
# Asynchronous
108110
tokio = { version = "1.40.0", features = ["full"] }
109111
socket2 = "0.5.7"
@@ -114,6 +116,7 @@ async-trait = "0.1.82"
114116
tracing = "0.1.40"
115117
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
116118
log = "0.4.22"
119+
console-subscriber = "0.4.1"
117120

118121
# Concurrency/Parallelism
119122
parking_lot = "0.12.3"
@@ -147,6 +150,7 @@ hashbrown = "0.15.0"
147150
tinyvec = "1.8.0"
148151
dashmap = "6.1.0"
149152
uuid = { version = "1.1", features = ["v4", "v3", "serde"] }
153+
whirlwind = "0.1.1"
150154

151155
# Macros
152156
lazy_static = "1.5.0"

src/bin/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ ferrumc-general-purpose = { workspace = true }
2828
ferrumc-state = { workspace = true }
2929

3030
ctor = { workspace = true }
31-
parking_lot = { workspace = true }
31+
parking_lot = { workspace = true, features = ["deadlock_detection"] }
3232
tracing = { workspace = true }
3333
tokio = { workspace = true }
3434
rayon = { workspace = true }

src/bin/src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub struct CLIArgs {
66
#[command(subcommand)]
77
pub command: Option<Command>,
88
#[clap(long)]
9-
#[arg(value_enum, default_value_t = LogLevel(Level::TRACE))]
9+
#[arg(value_enum, default_value_t = LogLevel(Level::DEBUG))]
1010
pub log: LogLevel,
1111
}
1212

src/bin/src/main.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ use crate::errors::BinaryError;
66
use clap::Parser;
77
use ferrumc_config::statics::get_global_config;
88
use ferrumc_config::whitelist::create_whitelist;
9+
use ferrumc_core::chunks::chunk_receiver::ChunkReceiver;
910
use ferrumc_ecs::Universe;
1011
use ferrumc_general_purpose::paths::get_root_path;
12+
use ferrumc_net::connection::StreamWriter;
1113
use ferrumc_net::server::create_server_listener;
1214
use ferrumc_state::ServerState;
1315
use ferrumc_world::World;
16+
use std::hash::{Hash, Hasher};
1417
use std::sync::Arc;
1518
use systems::definition;
1619
use tracing::{error, info};
@@ -28,6 +31,19 @@ async fn main() {
2831
let cli_args = CLIArgs::parse();
2932
ferrumc_logging::init_logging(cli_args.log.into());
3033

34+
check_deadlocks();
35+
36+
{
37+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
38+
std::any::TypeId::of::<ChunkReceiver>().hash(&mut hasher);
39+
let digest = hasher.finish();
40+
println!("ChunkReceiver: {:X}", digest);
41+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
42+
std::any::TypeId::of::<StreamWriter>().hash(&mut hasher);
43+
let digest = hasher.finish();
44+
println!("StreamWriter: {:X}", digest);
45+
}
46+
3147
match cli_args.command {
3248
Some(Command::Setup) => {
3349
info!("Starting setup...");
@@ -109,3 +125,28 @@ async fn create_state() -> Result<ServerState> {
109125
world: World::new().await,
110126
})
111127
}
128+
fn check_deadlocks() {
129+
{
130+
use parking_lot::deadlock;
131+
use std::thread;
132+
use std::time::Duration;
133+
134+
// Create a background thread which checks for deadlocks every 10s
135+
thread::spawn(move || loop {
136+
thread::sleep(Duration::from_secs(10));
137+
let deadlocks = deadlock::check_deadlock();
138+
if deadlocks.is_empty() {
139+
continue;
140+
}
141+
142+
println!("{} deadlocks detected", deadlocks.len());
143+
for (i, threads) in deadlocks.iter().enumerate() {
144+
println!("Deadlock #{}", i);
145+
for t in threads {
146+
println!("Thread Id {:#?}", t.thread_id());
147+
println!("{:#?}", t.backtrace());
148+
}
149+
}
150+
});
151+
}
152+
}

src/bin/src/packet_handlers/login_process.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use ferrumc_config::statics::{get_global_config, get_whitelist};
2+
use ferrumc_core::chunks::chunk_receiver::ChunkReceiver;
23
use ferrumc_core::identity::player_identity::PlayerIdentity;
34
use ferrumc_core::transform::grounded::OnGround;
45
use ferrumc_core::transform::position::Position;
@@ -38,6 +39,16 @@ async fn handle_login_start(
3839
let player_identity = PlayerIdentity::new(username.to_string(), uuid);
3940
debug!("Handling login start event for user: {username}, uuid: {uuid}");
4041

42+
// Add the player identity component to the ECS for the entity.
43+
state
44+
.universe
45+
.add_component::<PlayerIdentity>(
46+
login_start_event.conn_id,
47+
PlayerIdentity::new(username.to_string(), uuid),
48+
)?
49+
.add_component::<ChunkReceiver>(login_start_event.conn_id, ChunkReceiver::default())?;
50+
51+
//Send a Login Success Response to further the login sequence
4152
let mut writer = state
4253
.universe
4354
.get_mut::<StreamWriter>(login_start_event.conn_id)?;
@@ -184,6 +195,12 @@ async fn handle_ack_finish_configuration(
184195
&NetEncodeOpts::WithLength,
185196
)
186197
.await?;
198+
199+
let pos = state.universe.get_mut::<Position>(conn_id)?;
200+
let mut chunk_recv = state.universe.get_mut::<ChunkReceiver>(conn_id)?;
201+
chunk_recv.last_chunk = Some((pos.x as i32, pos.z as i32, String::from("overworld")));
202+
chunk_recv.calculate_chunks().await;
203+
187204
send_keep_alive(conn_id, state, &mut writer).await?;
188205

189206
Ok(ack_finish_configuration_event)

src/bin/src/packet_handlers/transform/update_player_position.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ferrumc_core::chunks::chunk_receiver::ChunkReceiver;
12
use ferrumc_core::transform::grounded::OnGround;
23
use ferrumc_core::transform::position::Position;
34
use ferrumc_core::transform::rotation::Rotation;
@@ -6,6 +7,7 @@ use ferrumc_net::errors::NetError;
67
use ferrumc_net::packets::packet_events::TransformEvent;
78
use ferrumc_net::utils::ecs_helpers::EntityExt;
89
use ferrumc_state::GlobalState;
10+
use tracing::trace;
911

1012
#[event_handler]
1113
async fn handle_player_move(
@@ -14,19 +16,48 @@ async fn handle_player_move(
1416
) -> Result<TransformEvent, NetError> {
1517
let conn_id = event.conn_id;
1618
if let Some(ref new_position) = event.position {
17-
let mut position = conn_id.get_mut::<Position>(&state)?;
19+
trace!("Getting chunk_recv 1 for player move");
20+
{
21+
let mut chunk_recv = state.universe.get_mut::<ChunkReceiver>(conn_id)?;
22+
trace!("Got chunk_recv 1 for player move");
23+
if let Some(last_chunk) = &chunk_recv.last_chunk {
24+
let new_chunk = (
25+
new_position.x as i32 / 16,
26+
new_position.z as i32 / 16,
27+
String::from("overworld"),
28+
);
29+
if *last_chunk != new_chunk {
30+
chunk_recv.last_chunk = Some(new_chunk);
31+
chunk_recv.calculate_chunks().await;
32+
}
33+
} else {
34+
chunk_recv.last_chunk = Some((
35+
new_position.x as i32 / 16,
36+
new_position.z as i32 / 16,
37+
String::from("overworld"),
38+
));
39+
chunk_recv.calculate_chunks().await;
40+
}
41+
}
1842

43+
trace!("Getting position 1 for player move");
44+
let mut position = conn_id.get_mut::<Position>(&state)?;
45+
trace!("Got position 1 for player move");
1946
*position = Position::new(new_position.x, new_position.y, new_position.z);
2047
}
2148

2249
if let Some(ref new_rotation) = event.rotation {
50+
trace!("Getting rotation 1 for player move");
2351
let mut rotation = conn_id.get_mut::<Rotation>(&state)?;
52+
trace!("Got rotation 1 for player move");
2453

2554
*rotation = Rotation::new(new_rotation.yaw, new_rotation.pitch);
2655
}
2756

2857
if let Some(new_grounded) = event.on_ground {
58+
trace!("Getting on_ground 1 for player move");
2959
let mut on_ground = conn_id.get_mut::<OnGround>(&state)?;
60+
trace!("Got on_ground 1 for player move");
3061

3162
*on_ground = OnGround(new_grounded);
3263
}

src/bin/src/systems/chunk_fetcher.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use crate::errors::BinaryError;
2+
use crate::systems::definition::System;
3+
use async_trait::async_trait;
4+
use ferrumc_core::chunks::chunk_receiver::ChunkReceiver;
5+
use ferrumc_state::GlobalState;
6+
use std::collections::HashMap;
7+
use std::sync::atomic::AtomicBool;
8+
use std::sync::Arc;
9+
use tokio::task::JoinSet;
10+
use tracing::{error, info, trace};
11+
12+
pub struct ChunkFetcher {
13+
stop: AtomicBool,
14+
}
15+
16+
impl ChunkFetcher {
17+
pub(crate) fn new() -> Self {
18+
Self {
19+
stop: AtomicBool::new(false),
20+
}
21+
}
22+
}
23+
24+
#[async_trait]
25+
impl System for ChunkFetcher {
26+
async fn start(self: Arc<Self>, state: GlobalState) {
27+
info!("Chunk fetcher system started");
28+
29+
while !self.stop.load(std::sync::atomic::Ordering::Relaxed) {
30+
let mut task_set: JoinSet<Result<(), BinaryError>> = JoinSet::new();
31+
let players = state.universe.query::<&mut ChunkReceiver>().into_entities();
32+
for eid in players {
33+
let state = state.clone();
34+
task_set.spawn(async move {
35+
// Copy the chunks into a new map so we don't lock the component while fetching
36+
let mut copied_chunks = {
37+
let Ok(chunk_recv) = state.universe.get::<ChunkReceiver>(eid) else {
38+
trace!("A player disconnected before we could get the ChunkReceiver");
39+
return Ok(());
40+
};
41+
let mut copied_chunks = HashMap::new();
42+
for chunk in chunk_recv.needed_chunks.iter() {
43+
let (key, chunk) = chunk.pair();
44+
if chunk.is_none() {
45+
copied_chunks.insert(key.clone(), None);
46+
}
47+
}
48+
copied_chunks
49+
};
50+
// Fetch the chunks
51+
for (key, chunk) in copied_chunks.iter_mut() {
52+
let fetched_chunk =
53+
state.world.load_chunk(key.0, key.1, &key.2.clone()).await?;
54+
*chunk = Some(fetched_chunk);
55+
}
56+
// Insert the fetched chunks back into the component
57+
{
58+
let Ok(chunk_recv) = state.universe.get::<ChunkReceiver>(eid) else {
59+
trace!("A player disconnected before we could get the ChunkReceiver");
60+
return Ok(());
61+
};
62+
for (key, chunk) in copied_chunks.iter() {
63+
chunk_recv.needed_chunks.insert(key.clone(), chunk.clone());
64+
}
65+
}
66+
Ok(())
67+
});
68+
}
69+
while let Some(result) = task_set.join_next().await {
70+
match result {
71+
Ok(task_res) => {
72+
if let Err(e) = task_res {
73+
error!("Error fetching chunk: {:?}", e);
74+
}
75+
}
76+
Err(e) => {
77+
error!("Error fetching chunk: {:?}", e);
78+
}
79+
}
80+
}
81+
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
82+
}
83+
}
84+
85+
async fn stop(self: Arc<Self>, _: GlobalState) {
86+
self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
87+
}
88+
89+
fn name(&self) -> &'static str {
90+
"Chunk Fetcher"
91+
}
92+
}

0 commit comments

Comments
 (0)