Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix interface bind #4510

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions gossip/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn parse_gossip_host(matches: &ArgMatches, entrypoint_addr: Option<SocketAddr>)
})
.unwrap_or_else(|| {
if let Some(entrypoint_addr) = entrypoint_addr {
solana_net_utils::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| {
solana_net_utils::get_public_ip_addr(&entrypoint_addr, None).unwrap_or_else(|err| {
eprintln!("Failed to contact cluster entrypoint {entrypoint_addr}: {err}");
exit(1);
})
Expand Down Expand Up @@ -222,7 +222,7 @@ fn get_entrypoint_shred_version(entrypoint: &Option<SocketAddr>) -> Option<u16>
error!("cannot obtain shred-version without an entrypoint");
return None;
};
match solana_net_utils::get_cluster_shred_version(entrypoint) {
match solana_net_utils::get_cluster_shred_version(entrypoint, None) {
Err(err) => {
error!("get_cluster_shred_version failed: {entrypoint}, {err}");
None
Expand Down
4 changes: 4 additions & 0 deletions net-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
anyhow = { workspace = true }

bincode = { workspace = true }
bytes = { workspace = true }

clap = { version = "3.1.5", features = ["cargo"], optional = true }
crossbeam-channel = { workspace = true }
log = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion net-utils/src/bin/ip_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() {
let addr = solana_net_utils::parse_host_port(host_port)
.unwrap_or_else(|_| panic!("failed to parse {host_port}"));

match solana_net_utils::get_public_ip_addr(&addr) {
match solana_net_utils::get_public_ip_addr(&addr, None) {
Ok(ip) => println!("{ip}"),
Err(err) => {
eprintln!("{addr}: {err}");
Expand Down
217 changes: 136 additions & 81 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
//! The `net_utils` module assists with networking
#![allow(clippy::arithmetic_side_effects)]

#[cfg(feature = "dev-context-only-utils")]
use tokio::net::UdpSocket as TokioUdpSocket;
use {
anyhow::{anyhow, bail},
bytes::{BufMut, BytesMut},
crossbeam_channel::unbounded,
log::*,
rand::{thread_rng, Rng},
socket2::{Domain, SockAddr, Socket, Type},
std::{
collections::{BTreeMap, HashSet},
io::{self, Read, Write},
io::{self},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
sync::{Arc, RwLock},
time::{Duration, Instant},
},
tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpSocket,
},
url::Url,
};

Expand All @@ -39,90 +46,115 @@ pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE
pub(crate) const HEADER_LENGTH: usize = 4;
pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;

fn ip_echo_server_request(
ip_echo_server_addr: &SocketAddr,
async fn ip_echo_server_request(
ip_echo_server_addr: SocketAddr,
msg: IpEchoServerMessage,
) -> Result<IpEchoServerResponse, String> {
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpEchoServerResponse> {
let timeout = Duration::new(5, 0);
TcpStream::connect_timeout(ip_echo_server_addr, timeout)
.and_then(|mut stream| {
// Start with HEADER_LENGTH null bytes to avoid looking like an HTTP GET/POST request
let mut bytes = vec![0; HEADER_LENGTH];

bytes.append(&mut bincode::serialize(&msg).expect("serialize IpEchoServerMessage"));

// End with '\n' to make this request look HTTP-ish and tickle an error response back
// from an HTTP server
bytes.push(b'\n');

stream.set_read_timeout(Some(Duration::new(10, 0)))?;
stream.write_all(&bytes)?;
stream.shutdown(std::net::Shutdown::Write)?;
let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
let _ = stream.read(&mut data[..])?;
Ok(data)
})
.and_then(|data| {
// It's common for users to accidentally confuse the validator's gossip port and JSON
// RPC port. Attempt to detect when this occurs by looking for the standard HTTP
// response header and provide the user with a helpful error message
if data.len() < HEADER_LENGTH {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Response too short, received {} bytes", data.len()),
));
}
let socket = tokio::net::TcpSocket::new_v4()?;
if let Some(addr) = bind_address {
socket.bind(SocketAddr::new(addr, 0))?;
}

let response_header: String =
data[0..HEADER_LENGTH].iter().map(|b| *b as char).collect();
if response_header != "\0\0\0\0" {
if response_header == "HTTP" {
let http_response = data.iter().map(|b| *b as char).collect::<String>();
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid gossip entrypoint. {ip_echo_server_addr} looks to be an HTTP port: {http_response}"
),
));
}
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid gossip entrypoint. {ip_echo_server_addr} provided an invalid response header: '{response_header}'"
),
));
async fn do_make_request(
socket: TcpSocket,
ip_echo_server_addr: SocketAddr,
msg: IpEchoServerMessage,
) -> anyhow::Result<BytesMut> {
let mut stream = socket.connect(ip_echo_server_addr).await?;
// Start with HEADER_LENGTH null bytes to avoid looking like an HTTP GET/POST request
let mut bytes = BytesMut::with_capacity(IP_ECHO_SERVER_RESPONSE_LENGTH);
bytes.extend_from_slice(&[0u8; HEADER_LENGTH]);
bytes.extend_from_slice(&bincode::serialize(&msg)?);

// End with '\n' to make this request look HTTP-ish and tickle an error response back
// from an HTTP server
bytes.put_u8(b'\n');
stream.write_all(&bytes).await?;
stream.flush().await?;

bytes.clear();
let _n = stream.read_buf(&mut bytes).await?;
stream.shutdown().await?;

Ok(bytes)
}

let response =
tokio::time::timeout(timeout, do_make_request(socket, ip_echo_server_addr, msg)).await??;
// It's common for users to accidentally confuse the validator's gossip port and JSON
// RPC port. Attempt to detect when this occurs by looking for the standard HTTP
// response header and provide the user with a helpful error message
if response.len() < HEADER_LENGTH {
bail!("Response too short, received {} bytes", response.len());
}

let (response_header, body) =
response
.split_first_chunk::<HEADER_LENGTH>()
.ok_or(anyhow::anyhow!(
"Not enough data in the response from {ip_echo_server_addr}!"
))?;
let payload = match response_header {
[0, 0, 0, 0] => bincode::deserialize(&response[HEADER_LENGTH..])?,
[b'H', b'T', b'T', b'P'] => {
let http_response = std::str::from_utf8(body);
match http_response {
Ok(r) => bail!("Invalid gossip entrypoint. {ip_echo_server_addr} looks to be an HTTP port replying with {r}"),
Err(_) => bail!("Invalid gossip entrypoint. {ip_echo_server_addr} looks to be an HTTP port."),
}
}
_ => {
bail!("Invalid gossip entrypoint. {ip_echo_server_addr} provided unexpected header bytes {response_header:?} ");
}
};

bincode::deserialize(&data[HEADER_LENGTH..]).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to deserialize: {err:?}"),
)
})
})
.map_err(|err| err.to_string())
Ok(payload)
}

/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
pub fn get_public_ip_addr(
ip_echo_server_addr: &SocketAddr,
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpAddr> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would maybe just recommend staying with io::Result instead of changing to anyhow::Result.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code did not report specific errors (it reported just String) so I have stuck with the convention. io::Result can not encode all ways this stuff can fail anyways, at least not cleanly. Since we are not handling these errors with match I see no point making life difficult.

let fut = ip_echo_server_request(
*ip_echo_server_addr,
IpEchoServerMessage::default(),
bind_address,
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let resp = rt.block_on(fut)?;
Ok(resp.address)
}

pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
pub fn get_cluster_shred_version(
ip_echo_server_addr: &SocketAddr,
bind_address: Option<IpAddr>,
) -> anyhow::Result<u16> {
let fut = ip_echo_server_request(
*ip_echo_server_addr,
IpEchoServerMessage::default(),
bind_address,
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let resp = rt.block_on(fut)?;
resp.shred_version
.ok_or_else(|| String::from("IP echo server does not return a shred-version"))
.ok_or_else(|| anyhow!("IP echo server does not return a shred-version"))
}
Comment on lines +118 to 149

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest you deprecate these functions first so it doesn't break downstream users of net-utils. Steps: deprecate them, rewrite the new functions with the new bind_address option, change code to use the new functions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I figured noone outside of anza is using these anyway. But if you think we must then we can do the whole deprecation dance as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya to be honest, I do not know if anyone is using these outside Anza. But if they are, good to not break the interface.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's sometimes hard to say: https://crates.io/crates/solana-net-utils

we've been slapped on the wrist for breaking public interfaces too often, so we've been getting stricter on this.

If we need to break compatibility or have a really compelling reason to, we can always ask Jacob/Nick to understand the downstream impact

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure we can keep old functions as they are, and add new ones that bind to interface. We do not always need to bind, so old ones will still be used.


// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
// `ip_echo_server_addr`
const DEFAULT_TIMEOUT_SECS: u64 = 5;
const DEFAULT_RETRY_COUNT: usize = 5;

fn do_verify_reachable_ports(
ip_echo_server_addr: &SocketAddr,
async fn do_verify_reachable_ports(
ip_echo_server_addr: SocketAddr,
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
timeout: u64,
Expand All @@ -137,7 +169,9 @@ fn do_verify_reachable_ports(
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &[]),
None,
)
.await
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

let mut ok = true;
Expand Down Expand Up @@ -218,7 +252,9 @@ fn do_verify_reachable_ports(
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&[], &checked_ports),
None,
)
.await
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

// Spawn threads at once!
Expand Down Expand Up @@ -300,13 +336,18 @@ pub fn verify_reachable_ports(
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
) -> bool {
do_verify_reachable_ports(
ip_echo_server_addr,
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Can not create a runtime");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.expect() error messages should be styled as "expect as precondition". see here: https://doc.rust-lang.org/std/error/index.html#common-message-styles. Basically formatted as "tokio builder should create a runtime" or something

let fut = do_verify_reachable_ports(
*ip_echo_server_addr,
tcp_listeners,
udp_sockets,
DEFAULT_TIMEOUT_SECS,
DEFAULT_RETRY_COUNT,
)
);
rt.block_on(fut)
}

pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
Expand Down Expand Up @@ -780,8 +821,14 @@ pub fn bind_more_with_config(

#[cfg(test)]
mod tests {
use {super::*, std::net::Ipv4Addr};
use {super::*, std::net::Ipv4Addr, tokio::runtime::Runtime};

fn runtime() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Can not create a runtime")
}
#[test]
fn test_response_length() {
let resp = IpEchoServerResponse {
Expand Down Expand Up @@ -957,10 +1004,13 @@ mod tests {

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
get_public_ip_addr(&server_ip_echo_addr),
parse_host("127.0.0.1"),
get_public_ip_addr(&server_ip_echo_addr, None).unwrap(),
parse_host("127.0.0.1").unwrap(),
);
assert_eq!(
get_cluster_shred_version(&server_ip_echo_addr, None).unwrap(),
42
);
assert_eq!(get_cluster_shred_version(&server_ip_echo_addr), Ok(42));
assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
}

Expand All @@ -982,10 +1032,13 @@ mod tests {

let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
get_public_ip_addr(&ip_echo_server_addr),
parse_host("127.0.0.1"),
get_public_ip_addr(&ip_echo_server_addr, None).unwrap(),
parse_host("127.0.0.1").unwrap(),
);
assert_eq!(
get_cluster_shred_version(&ip_echo_server_addr, None).unwrap(),
65535
);
assert_eq!(get_cluster_shred_version(&ip_echo_server_addr), Ok(65535));
assert!(verify_reachable_ports(
&ip_echo_server_addr,
vec![(client_port, client_tcp_listener)],
Expand All @@ -1008,13 +1061,14 @@ mod tests {
let (correct_client_port, (_client_udp_socket, client_tcp_listener)) =
bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap();

assert!(!do_verify_reachable_ports(
&server_ip_echo_addr,
let rt = runtime();
assert!(!rt.block_on(do_verify_reachable_ports(
server_ip_echo_addr,
vec![(correct_client_port, client_tcp_listener)],
&[],
2,
3,
));
)));
}

#[test]
Expand All @@ -1032,13 +1086,14 @@ mod tests {
let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap();

assert!(!do_verify_reachable_ports(
&server_ip_echo_addr,
let rt = runtime();
assert!(!rt.block_on(do_verify_reachable_ports(
server_ip_echo_addr,
vec![],
&[&client_udp_socket],
2,
3,
));
)));
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading