Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch some functions in net-utils to tokio #4524

Merged
merged 8 commits into from
Jan 21, 2025

Conversation

alexpyattaev
Copy link

@alexpyattaev alexpyattaev commented Jan 17, 2025

Problem

There is a bunch of code in net-utils that could be better implemented with tokio rather than std sockets. This is primarily to address a bigger problem as outlined in #4440, but also async implementations are nicer and cleaner.

Summary of Changes

  • Switched implementation of fn ip_echo_server_request in net-utils towards tokio (std TCP sockets do not support BINDTODEVICE)
  • Updated do_verify_reachable_ports to be async as well. It will now also verify all TCP ports concurrently (used to be serial).

@alexpyattaev alexpyattaev force-pushed the net_utils_rework branch 4 times, most recently from 6fb6eb4 to c0baf58 Compare January 17, 2025 21:10
Copy link

@gregcusack gregcusack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good! just a few comments!

Comment on lines 124 to 62
pub fn get_public_ip_addr_with_binding(
ip_echo_server_addr: &SocketAddr,
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpAddr> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we should change the name of this function. get_public_ip_addr_with_binding(). name has prefix _with_binding so the function should take in a bind address not a an Option bind_address: Option<IpAddr>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Comment on lines 145 to 91
pub fn get_cluster_shred_version_with_binding(
ip_echo_server_addr: &SocketAddr,
bind_address: Option<IpAddr>,
) -> anyhow::Result<u16> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here with get_cluster_shred_version_with_binding. if prefix is with_binding then it should take in a bind_address, not an Option

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Comment on lines 116 to 123
/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
get_public_ip_addr_with_binding(ip_echo_server_addr, None).map_err(|e| e.to_string())
}

/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would differentiate these comments since the functions are technically not the same

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Also fixed comments for the other public functions.

// No retries for TCP, abort on the first failure
return ok;
// No retries for TCP, abort on any failure
return false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just return ok here since ok is false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can, but this requires less brainpower to read=)

Comment on lines 49 to 53
async fn ip_echo_server_request(
ip_echo_server_addr: SocketAddr,
msg: IpEchoServerMessage,
) -> Result<IpEchoServerResponse, String> {
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpEchoServerResponse> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if it makes sense to have a function ip_echo_server_request and then a ip_echo_server_request_with_bind or something. That way it is clear that the function either is or is not binding to a socket. That way we avoid the Option all together and function use case is much clearer. same for the other methods that pass in bind_address: Option<IpAddr>. Open to your thoughts as always!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those functions would be 95% identical. I think Option<bind_address> conveys the idea fairly well - you may specify it, and if you do it will be used. And this is private anyway, so I think the "less code duplication" argument wins here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well you could avoid the duplication. for ip_echo_server_request_with_bind you'd do the bind. and then you could call ip_echo_server_request() which would then actually do the rest of the code that is currently in ip_echo_server_request(). call do_make_request() and then the rest of ip_echo_server_request().
Example:

async fn ip_echo_server_request _with_bind(
    ip_echo_server_addr: SocketAddr,
    msg: IpEchoServerMessage,
    bind_address: IpAddr,
) -> anyhow::Result<IpEchoServerResponse> {
    if let Some(addr) = bind_address {
        socket.bind(SocketAddr::new(addr, 0))?;
    }
    ip_echo_server_request(ip_echo_server_addr, msg)
}

async fn ip_echo_server_request(
    ip_echo_server_addr: SocketAddr,
    msg: IpEchoServerMessage,
) -> anyhow::Result<IpEchoServerResponse> {
    let timeout = Duration::new(5, 0);
    let socket = tokio::net::TcpSocket::new_v4()?;

    async fn do_make_request(
        socket: TcpSocket,
        ip_echo_server_addr: SocketAddr,
        msg: IpEchoServerMessage,
    ) -> anyhow::Result<BytesMut> {
        let mut stream = socket.connect(ip_echo_server_addr).await?;
        ....
    }
    ....
}

^ i would suggest writing it like this unless i am missing something (very possible lol). I do understand that it is a private function so it's not the end of the world but i think it's cleaner. quicker to see the only functionality difference is the bind_address/bind

@gregcusack gregcusack self-requested a review January 20, 2025 18:43
msg: IpEchoServerMessage,
) -> Result<IpEchoServerResponse, String> {
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpEchoServerResponse> {
let timeout = Duration::new(5, 0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we now change this to: DEFAULT_TIMEOUT? or just use DEFAULT_TIMEOUT when we call tokio::time::timeout(DEFAULT_TIMEOUT, do_make_request(...)).await???

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol vscode for some reason always highlights too many code lines. so i am referring to line 54 here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved timeout to be a const (as it should have always been)

Comment on lines 49 to 53
async fn ip_echo_server_request(
ip_echo_server_addr: SocketAddr,
msg: IpEchoServerMessage,
) -> Result<IpEchoServerResponse, String> {
bind_address: Option<IpAddr>,
) -> anyhow::Result<IpEchoServerResponse> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well you could avoid the duplication. for ip_echo_server_request_with_bind you'd do the bind. and then you could call ip_echo_server_request() which would then actually do the rest of the code that is currently in ip_echo_server_request(). call do_make_request() and then the rest of ip_echo_server_request().
Example:

async fn ip_echo_server_request _with_bind(
    ip_echo_server_addr: SocketAddr,
    msg: IpEchoServerMessage,
    bind_address: IpAddr,
) -> anyhow::Result<IpEchoServerResponse> {
    if let Some(addr) = bind_address {
        socket.bind(SocketAddr::new(addr, 0))?;
    }
    ip_echo_server_request(ip_echo_server_addr, msg)
}

async fn ip_echo_server_request(
    ip_echo_server_addr: SocketAddr,
    msg: IpEchoServerMessage,
) -> anyhow::Result<IpEchoServerResponse> {
    let timeout = Duration::new(5, 0);
    let socket = tokio::net::TcpSocket::new_v4()?;

    async fn do_make_request(
        socket: TcpSocket,
        ip_echo_server_addr: SocketAddr,
        msg: IpEchoServerMessage,
    ) -> anyhow::Result<BytesMut> {
        let mut stream = socket.connect(ip_echo_server_addr).await?;
        ....
    }
    ....
}

^ i would suggest writing it like this unless i am missing something (very possible lol). I do understand that it is a private function so it's not the end of the world but i think it's cleaner. quicker to see the only functionality difference is the bind_address/bind

Copy link

@gregcusack gregcusack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking very good! just last couple comments! thank youuuu!

@alexpyattaev
Copy link
Author

alexpyattaev commented Jan 20, 2025

I ended up moving all the implementation details of "how to talk to echo server" to a separate file.
Now the stuff in lib is exclusively public API stuff + tests, and the inner workings of client and server are in their own files. Also removed some unsafe that was not necessary (as modern rust can unwrap in constexpr).
And found a few small bugs in TCP port checking code.
Also all checks run concurrently now so startup is maybe a few ms faster.

@gregcusack
Copy link

Also removed some unsafe that was not necessary (as modern rust can unwrap in constexpr).
And found a few small bugs in TCP port checking code.

let's put these two changes in a follow up PR instead of adding into this one. Can get hard to follow when there a lot of different/somewhat unrelated changes happening in the same PR.

@alexpyattaev
Copy link
Author

Ok the TCP bugfix is indeed quite big, keeping it back for a follow-up

Copy link

@gregcusack gregcusack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sorry just a few changes to maintain consistency between two similar functions. then should be good to go

Comment on lines 47 to +73
pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| e.to_string())?;
let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
Ok(resp.address)
}

/// Determine the public IP address of this machine by asking an ip_echo_server at the given
/// address. This function will bind to the provided bind_addreess.
pub fn get_public_ip_addr_with_binding(
ip_echo_server_addr: &SocketAddr,
bind_address: IpAddr,
) -> anyhow::Result<IpAddr> {
let fut = ip_echo_server_request_with_binding(
*ip_echo_server_addr,
IpEchoServerMessage::default(),
bind_address,
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let resp = rt.block_on(fut)?;
Ok(resp.address)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would set these to methods to have the same return value? Either Result<IpAddr, String> or anyhow::Result<IpAddr>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well String is kind of the worst case. It is not useful for programmatic handling of errors, and it makes return value unnecessarily large. So for new API I see no benefit in using it if we are using anyhow.

Comment on lines 76 to +91
pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| e.to_string())?;
let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
resp.shred_version
.ok_or_else(|| String::from("IP echo server does not return a shred-version"))
.ok_or_else(|| "IP echo server does not return a shred-version".to_owned())
}

// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
// `ip_echo_server_addr`
const DEFAULT_TIMEOUT_SECS: u64 = 5;
const DEFAULT_RETRY_COUNT: usize = 5;

fn do_verify_reachable_ports(
///Retrieves cluster shred version from Entrypoint address provided, binds client-side socket to IP provided
pub fn get_cluster_shred_version_with_binding(
ip_echo_server_addr: &SocketAddr,
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
timeout: u64,
udp_retry_count: usize,
) -> bool {
info!(
"Checking that tcp ports {:?} are reachable from {:?}",
tcp_listeners, ip_echo_server_addr
);

let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &[]),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

let mut ok = true;
let timeout = Duration::from_secs(timeout);

// Wait for a connection to open on each TCP port
for (port, tcp_listener) in tcp_listeners {
let (sender, receiver) = unbounded();
let listening_addr = tcp_listener.local_addr().unwrap();
let thread_handle = std::thread::Builder::new()
.name(format!("solVrfyTcp{port:05}"))
.spawn(move || {
debug!("Waiting for incoming connection on tcp/{}", port);
match tcp_listener.incoming().next() {
Some(_) => sender
.send(())
.unwrap_or_else(|err| warn!("send failure: {}", err)),
None => warn!("tcp incoming failed"),
}
})
.unwrap();
match receiver.recv_timeout(timeout) {
Ok(_) => {
info!("tcp/{} is reachable", port);
}
Err(err) => {
error!(
"Received no response at tcp/{}, check your port configuration: {}",
port, err
);
// Ugh, std rustc doesn't provide accepting with timeout or restoring original
// nonblocking-status of sockets because of lack of getter, only the setter...
// So, to close the thread cleanly, just connect from here.
// ref: https://github.com/rust-lang/rust/issues/31615
TcpStream::connect_timeout(&listening_addr, timeout).unwrap();
ok = false;
}
}
// ensure to reap the thread
thread_handle.join().unwrap();
}

if !ok {
// No retries for TCP, abort on the first failure
return ok;
}

let mut udp_ports: BTreeMap<_, _> = BTreeMap::new();
udp_sockets.iter().for_each(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port();
udp_ports
.entry(port)
.or_insert_with(Vec::new)
.push(udp_socket);
});
let udp_ports: Vec<_> = udp_ports.into_iter().collect();

info!(
"Checking that udp ports {:?} are reachable from {:?}",
udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
ip_echo_server_addr
bind_address: IpAddr,
) -> anyhow::Result<u16> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same goes for these two methods. Will be easier to handle the errors on the caller side if they are the same. Or is there a specific reason get_cluster_shred_version_with_binding returns something different (although similar) to get_cluster_shred_version

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the with_binding version is what will be used in the validator. the versions without binding are likely going to get deprecated (as we need to be able to support binding to interfaces for security reasons). So, within one binary, it would improbable that both versions would be used.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya my bad, you're right.

@gregcusack gregcusack self-requested a review January 21, 2025 21:35
Copy link

@gregcusack gregcusack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! thank you!

@alexpyattaev alexpyattaev merged commit c043329 into anza-xyz:master Jan 21, 2025
58 checks passed
@alexpyattaev alexpyattaev deleted the net_utils_rework branch January 21, 2025 21:52
@@ -22,8 +22,7 @@ pub type IpEchoServer = Runtime;
// Enforce a minimum of two threads:
// - One thread to monitor the TcpListener and spawn async tasks
// - One thread to service the spawned tasks
// The unsafe is safe because we're using a fixed, known non-zero value
pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2) };
pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = NonZeroUsize::new(2).unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not compile.
Also cc @yihau because surprisingly the CI did not complain.

error: `std::option::Option::<T>::unwrap` is not yet stable as a const fn
  --> net-utils/src/ip_echo_server.rs:25:58
   |
25 | pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = NonZeroUsize::new(2).unwrap();
   |                                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = help: add `#![feature(const_option)]` to the crate attributes to enable

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait... on which compiler version does it not compile?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just recently bumped up rust version.
So this is actually good.
Please ignore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants