Skip to content

Commit 1d15d25

Browse files
davidv1992rnijveld
authored andcommitted
Implemented network interface change detector.
1 parent aa758c9 commit 1d15d25

File tree

6 files changed

+245
-1
lines changed

6 files changed

+245
-1
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ tracing = { version = "0.1.37", default-features = false, features = ["std", "lo
1818
serde = { version = "1.0.145", features = ["derive"], optional = true }
1919

2020
[dev-dependencies]
21-
tokio = { version = "1.32.0", features = ["rt", "macros"] }
21+
tokio = { version = "1.32.0", features = ["full"] }
2222

2323
[features]
2424
default = ["serde"]

examples/logchanges.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use timestamped_socket::interface::ChangeDetector;
2+
3+
#[tokio::main]
4+
async fn main() {
5+
let mut detector = ChangeDetector::new().unwrap();
6+
7+
loop {
8+
detector.wait_for_change().await;
9+
println!("Change detected");
10+
}
11+
}

src/interface.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@ use std::{
66

77
use super::cerr;
88

9+
#[cfg(target_os = "linux")]
10+
mod linux;
11+
#[cfg(target_os = "linux")]
12+
pub use linux::ChangeDetector;
13+
14+
// NOTE: this detection logic is not sharable with macos!
15+
#[cfg(target_os = "freebsd")]
16+
mod freebsd;
17+
#[cfg(target_os = "freebsd")]
18+
pub use freebsd::ChangeDetector;
19+
20+
#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
21+
mod fallback;
22+
#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
23+
pub use fallback::ChangeDetector;
24+
925
pub fn interfaces() -> std::io::Result<HashMap<InterfaceName, InterfaceData>> {
1026
let mut elements = HashMap::default();
1127

src/interface/fallback.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
struct Private;
2+
3+
pub struct ChangeDetector {
4+
_private: Private,
5+
}
6+
7+
impl ChangeDetector {
8+
pub fn new() -> std::io::Result<Self> {
9+
Ok(Self { _private: Private })
10+
}
11+
12+
pub async fn wait_for_change(&mut self) {
13+
// No platform independent way, but checking every so often is fine for a fallback
14+
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
15+
}
16+
}

src/interface/freebsd.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::{io::ErrorKind, os::fd::RawFd};
2+
3+
use libc::recv;
4+
use tokio::io::{unix::AsyncFd, Interest};
5+
6+
use crate::{cerr, control_message::zeroed_sockaddr_storage};
7+
8+
pub struct ChangeDetector {
9+
fd: AsyncFd<RawFd>,
10+
}
11+
12+
impl ChangeDetector {
13+
const SOCKET_PATH: &'static [u8] = b"/var/run/devd.seqpacket.pipe";
14+
pub fn new() -> std::io::Result<Self> {
15+
const _: () = assert!(
16+
std::mem::size_of::<libc::sockaddr_storage>()
17+
>= std::mem::size_of::<libc::sockaddr_un>()
18+
);
19+
const _: () = assert!(
20+
std::mem::align_of::<libc::sockaddr_storage>()
21+
>= std::mem::align_of::<libc::sockaddr_un>()
22+
);
23+
24+
let mut address_buf = zeroed_sockaddr_storage();
25+
// Safety: the above assertions guarantee that alignment and size are correct.
26+
// the resulting reference won't outlast the function, and result lives the entire
27+
// duration of the function
28+
let address = unsafe {
29+
&mut *(&mut address_buf as *mut libc::sockaddr_storage as *mut libc::sockaddr_un)
30+
};
31+
32+
address.sun_family = libc::AF_UNIX as _;
33+
for i in 0..Self::SOCKET_PATH.len() {
34+
address.sun_path[i] = Self::SOCKET_PATH[i] as _;
35+
}
36+
37+
// Safety: calling socket is safe
38+
let fd = cerr(unsafe { libc::socket(libc::AF_UNIX, libc::SOCK_SEQPACKET, 0) })?;
39+
// Safety: address is valid for the duration of the call
40+
cerr(unsafe {
41+
libc::bind(
42+
fd,
43+
address as *mut _ as *mut _,
44+
std::mem::size_of_val(address) as _,
45+
)
46+
})?;
47+
48+
let nonblocking = 1 as libc::c_int;
49+
// Safety: nonblocking lives for the duration of the call, and is 4 bytes long as expected for FIONBIO
50+
cerr(unsafe { libc::ioctl(fd, libc::FIONBIO, &nonblocking) })?;
51+
52+
Ok(ChangeDetector {
53+
fd: AsyncFd::new(fd)?,
54+
})
55+
}
56+
57+
fn empty(fd: i32) {
58+
loop {
59+
// Safety: buf is valid for the duration of the call, and it's length is passed as the len argument
60+
let mut buf = [0u8; 16];
61+
match cerr(unsafe {
62+
recv(
63+
fd,
64+
&mut buf as *mut _ as *mut _,
65+
std::mem::size_of_val(&buf) as _,
66+
0,
67+
) as _
68+
}) {
69+
Ok(_) => continue,
70+
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
71+
Err(e) => {
72+
tracing::error!("Could not receive on change socket: {}", e);
73+
break;
74+
}
75+
}
76+
}
77+
}
78+
79+
pub async fn wait_for_change(&mut self) {
80+
if let Err(e) = self
81+
.fd
82+
.async_io(Interest::READABLE, |fd| {
83+
// Safety: buf is valid for the duration of the call, and it's length is passed as the len argument
84+
let mut buf = [0u8; 16];
85+
cerr(unsafe {
86+
recv(
87+
*fd,
88+
&mut buf as *mut _ as *mut _,
89+
std::mem::size_of_val(&buf) as _,
90+
0,
91+
) as _
92+
})?;
93+
Self::empty(*fd);
94+
Ok(())
95+
})
96+
.await
97+
{
98+
tracing::error!("Could not receive on change socket: {}", e);
99+
}
100+
}
101+
}

src/interface/linux.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::{io::ErrorKind, os::fd::RawFd};
2+
3+
use libc::recv;
4+
use tokio::io::{unix::AsyncFd, Interest};
5+
6+
use crate::{cerr, control_message::zeroed_sockaddr_storage};
7+
8+
pub struct ChangeDetector {
9+
fd: AsyncFd<RawFd>,
10+
}
11+
12+
impl ChangeDetector {
13+
pub fn new() -> std::io::Result<Self> {
14+
const _: () = assert!(
15+
std::mem::size_of::<libc::sockaddr_storage>()
16+
>= std::mem::size_of::<libc::sockaddr_nl>()
17+
);
18+
const _: () = assert!(
19+
std::mem::align_of::<libc::sockaddr_storage>()
20+
>= std::mem::align_of::<libc::sockaddr_nl>()
21+
);
22+
23+
let mut address_buf = zeroed_sockaddr_storage();
24+
// Safety: the above assertions guarantee that alignment and size are correct.
25+
// the resulting reference won't outlast the function, and result lives the entire
26+
// duration of the function
27+
let address = unsafe {
28+
&mut *(&mut address_buf as *mut libc::sockaddr_storage as *mut libc::sockaddr_nl)
29+
};
30+
31+
address.nl_family = libc::AF_NETLINK as _;
32+
address.nl_groups =
33+
(libc::RTMGRP_IPV4_IFADDR | libc::RTMGRP_IPV6_IFADDR | libc::RTMGRP_LINK) as _;
34+
35+
// Safety: calling socket is safe
36+
let fd =
37+
cerr(unsafe { libc::socket(libc::AF_NETLINK, libc::SOCK_RAW, libc::NETLINK_ROUTE) })?;
38+
// Safety: address is valid for the duration of the call
39+
cerr(unsafe {
40+
libc::bind(
41+
fd,
42+
address as *mut _ as *mut _,
43+
std::mem::size_of_val(address) as _,
44+
)
45+
})?;
46+
47+
let nonblocking = 1 as libc::c_int;
48+
// Safety: nonblocking lives for the duration of the call, and is 4 bytes long as expected for FIONBIO
49+
cerr(unsafe { libc::ioctl(fd, libc::FIONBIO, &nonblocking) })?;
50+
51+
Ok(ChangeDetector {
52+
fd: AsyncFd::new(fd)?,
53+
})
54+
}
55+
56+
fn empty(fd: i32) {
57+
loop {
58+
// Safety: buf is valid for the duration of the call, and it's length is passed as the len argument
59+
let mut buf = [0u8; 16];
60+
match cerr(unsafe {
61+
recv(
62+
fd,
63+
&mut buf as *mut _ as *mut _,
64+
std::mem::size_of_val(&buf) as _,
65+
0,
66+
) as _
67+
}) {
68+
Ok(_) => continue,
69+
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
70+
Err(e) => {
71+
tracing::error!("Could not receive on change socket: {}", e);
72+
break;
73+
}
74+
}
75+
}
76+
}
77+
78+
pub async fn wait_for_change(&mut self) {
79+
if let Err(e) = self
80+
.fd
81+
.async_io(Interest::READABLE, |fd| {
82+
// Safety: buf is valid for the duration of the call, and it's length is passed as the len argument
83+
let mut buf = [0u8; 16];
84+
cerr(unsafe {
85+
recv(
86+
*fd,
87+
&mut buf as *mut _ as *mut _,
88+
std::mem::size_of_val(&buf) as _,
89+
0,
90+
) as _
91+
})?;
92+
Self::empty(*fd);
93+
Ok(())
94+
})
95+
.await
96+
{
97+
tracing::error!("Could not receive on change socket: {}", e);
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)