Skip to content

Commit fe728a9

Browse files
authored
Merge pull request #162 from ferrumc-rs/fix/make-packet-sending-sync
Looks fine
2 parents c5315cb + e40366f commit fe728a9

File tree

18 files changed

+150
-181
lines changed

18 files changed

+150
-181
lines changed

Cargo.toml

+1-8
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,10 @@ infinite_loop = "deny"
6262

6363
#================= Profile =================#
6464
[profile.release]
65-
lto = true
66-
strip = "symbols"
67-
codegen-units = 1
68-
opt-level = 3
69-
debug = false
70-
debug-assertions = false
71-
overflow-checks = false
72-
panic = "abort"
7365

7466
[profile.hyper]
7567
inherits = "release"
68+
strip = "symbols"
7669
lto = true
7770
opt-level = 3
7871
debug = false

src/bin/src/main.rs

+7-42
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,15 @@ use crate::errors::BinaryError;
66
use clap::Parser;
77
use ferrumc_config::statics::get_global_config;
88
use ferrumc_config::whitelist::create_whitelist;
9-
use ferrumc_core::chunks::chunk_receiver::ChunkReceiver;
109
use ferrumc_ecs::Universe;
1110
use ferrumc_general_purpose::paths::get_root_path;
12-
use ferrumc_net::connection::StreamWriter;
1311
use ferrumc_net::server::create_server_listener;
1412
use ferrumc_state::ServerState;
1513
use ferrumc_world::World;
16-
use std::hash::{Hash, Hasher};
1714
use std::sync::Arc;
1815
use systems::definition;
19-
use tracing::{error, info, trace};
16+
use tokio::runtime::Handle;
17+
use tracing::{error, info};
2018

2119
pub(crate) mod errors;
2220
use crate::cli::{CLIArgs, Command, ImportArgs};
@@ -26,23 +24,15 @@ mod systems;
2624

2725
pub type Result<T> = std::result::Result<T, BinaryError>;
2826

29-
#[tokio::main]
27+
// #[tokio::main(flavor = "current_thread")]
28+
#[tokio::main(flavor = "multi_thread")]
3029
async fn main() {
3130
let cli_args = CLIArgs::parse();
3231
ferrumc_logging::init_logging(cli_args.log.into());
3332

34-
check_deadlocks();
35-
36-
{
37-
let mut hasher = std::collections::hash_map::DefaultHasher::new();
38-
std::any::TypeId::of::<ChunkReceiver>().hash(&mut hasher);
39-
let digest = hasher.finish();
40-
trace!("ChunkReceiver: {:X}", digest);
41-
let mut hasher = std::collections::hash_map::DefaultHasher::new();
42-
std::any::TypeId::of::<StreamWriter>().hash(&mut hasher);
43-
let digest = hasher.finish();
44-
trace!("StreamWriter: {:X}", digest);
45-
}
33+
let current_active_threads = Handle::current().metrics().num_workers();
34+
35+
info!("FERRUMC IS USING {} THREAD(s)", current_active_threads);
4636

4737
match cli_args.command {
4838
Some(Command::Setup) => {
@@ -125,28 +115,3 @@ async fn create_state() -> Result<ServerState> {
125115
world: World::new().await,
126116
})
127117
}
128-
fn check_deadlocks() {
129-
{
130-
use parking_lot::deadlock;
131-
use std::thread;
132-
use std::time::Duration;
133-
134-
// Create a background thread which checks for deadlocks every 10s
135-
thread::spawn(move || loop {
136-
thread::sleep(Duration::from_secs(10));
137-
let deadlocks = deadlock::check_deadlock();
138-
if deadlocks.is_empty() {
139-
continue;
140-
}
141-
142-
println!("{} deadlocks detected", deadlocks.len());
143-
for (i, threads) in deadlocks.iter().enumerate() {
144-
println!("Deadlock #{}", i);
145-
for t in threads {
146-
println!("Thread Id {:#?}", t.thread_id());
147-
println!("{:#?}", t.backtrace());
148-
}
149-
}
150-
});
151-
}
152-
}

src/bin/src/packet_handlers/login_process.rs

+27-52
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,12 @@ async fn handle_login_start(
6464
let whitelist = get_whitelist();
6565

6666
if whitelist.get(&uuid).is_none() {
67-
writer
68-
.send_packet(
69-
&LoginDisconnectPacket::new(
70-
"{\"translate\":\"multiplayer.disconnect.not_whitelisted\"}",
71-
),
72-
&NetEncodeOpts::WithLength,
73-
)
74-
.await?;
67+
writer.send_packet(
68+
LoginDisconnectPacket::new(
69+
"{\"translate\":\"multiplayer.disconnect.not_whitelisted\"}",
70+
),
71+
&NetEncodeOpts::WithLength,
72+
)?;
7573
return Ok(login_start_event);
7674
}
7775
}
@@ -82,12 +80,10 @@ async fn handle_login_start(
8280
.add_component::<PlayerIdentity>(login_start_event.conn_id, player_identity)?;
8381

8482
//Send a Login Success Response to further the login sequence
85-
writer
86-
.send_packet(
87-
&LoginSuccessPacket::new(uuid, username),
88-
&NetEncodeOpts::WithLength,
89-
)
90-
.await?;
83+
writer.send_packet(
84+
LoginSuccessPacket::new(uuid, username),
85+
&NetEncodeOpts::WithLength,
86+
)?;
9187

9288
Ok(login_start_event)
9389
}
@@ -113,9 +109,7 @@ async fn handle_login_acknowledged(
113109
.universe
114110
.get_mut::<StreamWriter>(login_acknowledged_event.conn_id)?;
115111

116-
writer
117-
.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength)
118-
.await?;
112+
writer.send_packet(client_bound_known_packs, &NetEncodeOpts::WithLength)?;
119113

120114
Ok(login_acknowledged_event)
121115
}
@@ -132,16 +126,9 @@ async fn handle_server_bound_known_packs(
132126
.get_mut::<StreamWriter>(server_bound_known_packs_event.conn_id)?;
133127

134128
let registry_packets = get_registry_packets();
135-
writer
136-
.send_packet(&registry_packets, &NetEncodeOpts::None)
137-
.await?;
129+
writer.send_packet(registry_packets, &NetEncodeOpts::None)?;
138130

139-
writer
140-
.send_packet(
141-
&FinishConfigurationPacket::new(),
142-
&NetEncodeOpts::WithLength,
143-
)
144-
.await?;
131+
writer.send_packet(FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength)?;
145132

146133
Ok(server_bound_known_packs_event)
147134
}
@@ -170,38 +157,32 @@ async fn handle_ack_finish_configuration(
170157
let mut writer = state.universe.get_mut::<StreamWriter>(entity_id)?;
171158

172159
writer // 21
173-
.send_packet(&LoginPlayPacket::new(entity_id), &NetEncodeOpts::WithLength)
174-
.await?;
160+
.send_packet(LoginPlayPacket::new(entity_id), &NetEncodeOpts::WithLength)?;
175161
writer // 29
176162
.send_packet(
177-
&SynchronizePlayerPositionPacket::default(), // The coordinates here should be used for the center chunk.
163+
SynchronizePlayerPositionPacket::default(), // The coordinates here should be used for the center chunk.
178164
&NetEncodeOpts::WithLength,
179-
)
180-
.await?;
165+
)?;
181166
writer // 37
182167
.send_packet(
183-
&SetDefaultSpawnPositionPacket::default(), // Player specific, aka. home, bed, where it would respawn.
168+
SetDefaultSpawnPositionPacket::default(), // Player specific, aka. home, bed, where it would respawn.
184169
&NetEncodeOpts::WithLength,
185-
)
186-
.await?;
170+
)?;
187171
writer // 38
188172
.send_packet(
189-
&GameEventPacket::start_waiting_for_level_chunks(),
173+
GameEventPacket::start_waiting_for_level_chunks(),
190174
&NetEncodeOpts::WithLength,
191-
)
192-
.await?;
175+
)?;
193176
writer // 41
194177
.send_packet(
195-
&SetCenterChunk::new(0, 0), // TODO - Dependent on the player spawn position.
178+
SetCenterChunk::new(0, 0), // TODO - Dependent on the player spawn position.
196179
&NetEncodeOpts::WithLength,
197-
)
198-
.await?;
180+
)?;
199181
writer // other
200182
.send_packet(
201-
&SetRenderDistance::new(5), // TODO
183+
SetRenderDistance::new(5), // TODO
202184
&NetEncodeOpts::WithLength,
203-
)
204-
.await?;
185+
)?;
205186

206187
send_keep_alive(entity_id, &state, &mut writer).await?;
207188

@@ -222,9 +203,7 @@ async fn send_keep_alive(
222203
writer: &mut ComponentRefMut<'_, StreamWriter>,
223204
) -> Result<(), NetError> {
224205
let keep_alive_packet = OutgoingKeepAlivePacket::default();
225-
writer
226-
.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength)
227-
.await?;
206+
writer.send_packet(keep_alive_packet.clone(), &NetEncodeOpts::WithLength)?;
228207

229208
let timestamp = keep_alive_packet.timestamp;
230209

@@ -262,9 +241,7 @@ async fn player_info_update_packets(entity_id: Entity, state: &GlobalState) -> N
262241

263242
let start = Instant::now();
264243
let mut writer = state.universe.get_mut::<StreamWriter>(entity_id)?;
265-
writer
266-
.send_packet(&packet, &NetEncodeOpts::WithLength)
267-
.await?;
244+
writer.send_packet(packet, &NetEncodeOpts::WithLength)?;
268245
debug!("Sending player info update took: {:?}", start.elapsed());
269246
}
270247

@@ -287,9 +264,7 @@ async fn broadcast_spawn_entity_packet(entity_id: Entity, state: &GlobalState) -
287264
futures::stream::iter(get_all_play_players(state))
288265
.fold(writer, |mut writer, entity| async move {
289266
if let Ok(packet) = SpawnEntityPacket::player(entity, state) {
290-
let _ = writer
291-
.send_packet(&packet, &NetEncodeOpts::WithLength)
292-
.await;
267+
let _ = writer.send_packet(packet, &NetEncodeOpts::WithLength);
293268
}
294269
writer
295270
})

src/bin/src/packet_handlers/player/update_player_position.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,12 @@ async fn update_pos_for_all(
110110
const MAX_DELTA: i16 = (7.5 * 4096f32) as i16;
111111
let delta_exceeds_threshold = match delta_pos {
112112
Some((delta_x, delta_y, delta_z)) => {
113-
delta_x.abs() > MAX_DELTA || delta_y.abs() > MAX_DELTA || delta_z.abs() > MAX_DELTA
113+
// Prevent int overflow, since abs of i16::MIN would overflow?
114+
if delta_x == i16::MIN || delta_y == i16::MIN || delta_z == i16::MIN {
115+
true
116+
} else {
117+
delta_x.abs() > MAX_DELTA || delta_y.abs() > MAX_DELTA || delta_z.abs() > MAX_DELTA
118+
}
114119
}
115120
None => false,
116121
};

src/bin/src/systems/chunk_fetcher.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl System for ChunkFetcher {
4040
};
4141
let mut copied_chunks = HashMap::new();
4242
for chunk in chunk_recv.needed_chunks.iter() {
43-
let (key, chunk) = chunk.pair();
43+
let (key, chunk) = chunk;
4444
if chunk.is_none() {
4545
copied_chunks.insert(key.clone(), None);
4646
}
@@ -55,7 +55,8 @@ impl System for ChunkFetcher {
5555
}
5656
// Insert the fetched chunks back into the component
5757
{
58-
let Ok(chunk_recv) = state.universe.get::<ChunkReceiver>(eid) else {
58+
let Ok(mut chunk_recv) = state.universe.get_mut::<ChunkReceiver>(eid)
59+
else {
5960
trace!("A player disconnected before we could get the ChunkReceiver");
6061
return Ok(());
6162
};

src/bin/src/systems/chunk_sender.rs

+22-29
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@ impl System for ChunkSenderSystem {
6969
}
7070
let mut sent_chunks = 0;
7171
{
72-
let Ok(chunk_recv) = state.universe.get::<ChunkReceiver>(eid) else {
72+
let Ok(mut chunk_recv) = state.universe.get_mut::<ChunkReceiver>(eid)
73+
else {
7374
trace!("A player disconnected before we could get the ChunkReceiver");
7475
return Ok(());
7576
};
7677
for possible_chunk in chunk_recv.needed_chunks.iter_mut() {
77-
if let Some(chunk) = possible_chunk.pair().1 {
78-
let key = possible_chunk.pair().0;
78+
if let Some(chunk) = possible_chunk.1 {
79+
let key = possible_chunk.0;
7980
to_drop.push(key.clone());
8081
match ChunkAndLightData::from_chunk(&chunk.clone()) {
8182
Ok(packet) => {
@@ -90,7 +91,8 @@ impl System for ChunkSenderSystem {
9091
}
9192
}
9293
{
93-
let Ok(chunk_recv) = state.universe.get::<ChunkReceiver>(eid) else {
94+
let Ok(mut chunk_recv) = state.universe.get_mut::<ChunkReceiver>(eid)
95+
else {
9496
trace!("A player disconnected before we could get the ChunkReceiver");
9597
return Ok(());
9698
};
@@ -107,40 +109,31 @@ impl System for ChunkSenderSystem {
107109
error!("Could not get StreamWriter");
108110
return Ok(());
109111
};
110-
if let Err(e) = conn
111-
.send_packet(
112-
&SetCenterChunk {
113-
x: VarInt::new(centre_coords.0),
114-
z: VarInt::new(centre_coords.1),
115-
},
116-
&NetEncodeOpts::WithLength,
117-
)
118-
.await
119-
{
112+
if let Err(e) = conn.send_packet(
113+
SetCenterChunk {
114+
x: VarInt::new(centre_coords.0),
115+
z: VarInt::new(centre_coords.1),
116+
},
117+
&NetEncodeOpts::WithLength,
118+
) {
120119
error!("Error sending chunk: {:?}", e);
121120
}
122-
if let Err(e) = conn
123-
.send_packet(&ChunkBatchStart {}, &NetEncodeOpts::WithLength)
124-
.await
121+
if let Err(e) =
122+
conn.send_packet(ChunkBatchStart {}, &NetEncodeOpts::WithLength)
125123
{
126124
error!("Error sending chunk: {:?}", e);
127125
}
128126
for packet in packets {
129-
if let Err(e) =
130-
conn.send_packet(&packet, &NetEncodeOpts::WithLength).await
131-
{
127+
if let Err(e) = conn.send_packet(packet, &NetEncodeOpts::WithLength) {
132128
error!("Error sending chunk: {:?}", e);
133129
}
134130
}
135-
if let Err(e) = conn
136-
.send_packet(
137-
&ChunkBatchFinish {
138-
batch_size: VarInt::new(sent_chunks),
139-
},
140-
&NetEncodeOpts::WithLength,
141-
)
142-
.await
143-
{
131+
if let Err(e) = conn.send_packet(
132+
ChunkBatchFinish {
133+
batch_size: VarInt::new(sent_chunks),
134+
},
135+
&NetEncodeOpts::WithLength,
136+
) {
144137
error!("Error sending chunk: {:?}", e);
145138
}
146139
}

0 commit comments

Comments
 (0)